diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 0000000..dbb21a8 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +cargo fmt --all + +if ! git diff --quiet; then + echo "cargo fmt changed files. Please review and stage the formatting changes." + exit 1 +fi + +cargo check --all-targets --all-features +cargo test --all-targets --all-features diff --git a/Cargo.lock b/Cargo.lock index c2cd1a8..42645a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "anstream" version = "1.0.0" @@ -64,6 +73,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + [[package]] name = "clap" version = "4.6.1" @@ -251,6 +266,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.185" @@ -263,6 +284,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" +[[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + [[package]] name = "lsp-types" version = "0.95.1" @@ -276,12 +303,36 @@ dependencies = [ "url", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "once_cell" +version = "1.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" + [[package]] name = "once_cell_polyfill" version = "1.70.2" @@ -294,6 +345,12 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + [[package]] name = "potential_utf" version = "0.1.5" @@ -321,6 +378,23 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "serde" version = "1.0.228" @@ -375,6 +449,15 @@ dependencies = [ "syn", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "smallvec" version = "1.15.1" @@ -396,6 +479,8 @@ dependencies = [ "lsp-types", "serde_json", "subprocess", + "tracing", + "tracing-subscriber", ] [[package]] @@ -436,6 +521,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tinystr" version = "0.8.3" @@ -446,6 +540,67 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", +] + [[package]] name = "unicode-ident" version = "1.0.24" @@ -477,6 +632,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 75e5675..3fb36c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,11 +4,257 @@ version = "0.1.0" edition = "2024" license = "MIT" +description = "A simple STDIO forwarder over TCP that also offers a health check port." +repository = "https://github.com/ninjaneers-team/stdioxide" +readme = "README.md" +keywords = ["tcp", "stdio", "tcp-proxy", "language-server", "health-check"] +categories = ["command-line-utilities", "network-programming", "development-tools"] + [dependencies] anyhow = "1.0.102" clap = { version = "4.6.1", features = ["derive", "env"] } subprocess = "1.0.3" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] lsp-types = "0.95" serde_json = "1.0" + +[lints.rust] +###################################################################################################################### +# Broad rustc lint groups +###################################################################################################################### + +# Future-proofing and edition migration. +future_incompatible = { level = "warn", priority = -1 } +rust_2018_idioms = { level = "warn", priority = -1 } +rust_2021_compatibility = { level = "warn", priority = -1 } +rust_2024_compatibility = { level = "warn", priority = -1 } + +# Useful rustc lint groups. +keyword_idents = { level = "warn", priority = -1 } +let_underscore = { level = "warn", priority = -1 } +nonstandard_style = { level = "warn", priority = -1 } +refining_impl_trait = { level = "warn", priority = -1 } +unknown_or_malformed_diagnostic_attributes = { level = "warn", priority = -1 } +unused = { level = "warn", priority = -1 } + +###################################################################################################################### +# Lints commonly overridden depending on project state or type +###################################################################################################################### + +# These can be noisy during early development, but are useful before release. +missing_docs = "warn" +dead_code = "warn" + +# Disable if this reports false positives for build-dependencies or target-specific deps. +unused_crate_dependencies = "warn" + +###################################################################################################################### +# Rust 2024 / compatibility lints worth keeping explicit +###################################################################################################################### + +boxed_slice_into_iter = "warn" +dependency_on_unit_never_type_fallback = "warn" +deprecated_safe_2024 = "warn" +edition_2024_expr_fragment_specifier = "warn" +if_let_rescope = "warn" +impl_trait_overcaptures = "warn" +keyword_idents_2024 = "warn" +missing_unsafe_on_extern = "warn" +never_type_fallback_flowing_into_unsafe = "warn" +rust_2024_guarded_string_incompatible_syntax = "warn" +rust_2024_incompatible_pat = "warn" +rust_2024_prelude_collisions = "warn" +static_mut_refs = "warn" +tail_expr_drop_order = "warn" +unsafe_attr_outside_unsafe = "warn" + +###################################################################################################################### +# Lints that should only be overridden for a small scope with a comment explaining the exception +###################################################################################################################### + +absolute_paths_not_starting_with_crate = "warn" +elided_lifetimes_in_paths = "warn" +explicit_outlives_requirements = "warn" +ffi_unwind_calls = "warn" +let_underscore_drop = "warn" +let_underscore_lock = "warn" +macro_use_extern_crate = "warn" +meta_variable_misuse = "warn" +missing_abi = "warn" +non_ascii_idents = "warn" +rust_2021_incompatible_closure_captures = "warn" +rust_2021_incompatible_or_patterns = "warn" +rust_2021_prefixes_incompatible_syntax = "warn" +rust_2021_prelude_collisions = "warn" +single_use_lifetimes = "warn" +trivial_casts = "warn" +trivial_numeric_casts = "warn" +unit_bindings = "warn" +unreachable_pub = "warn" +unsafe_code = "forbid" +unsafe_op_in_unsafe_fn = "warn" +unstable_features = "warn" +unused_extern_crates = "warn" +unused_import_braces = "warn" +unused_lifetimes = "warn" +unused_macro_rules = "warn" +unused_qualifications = "warn" +variant_size_differences = "warn" + +# Unstable lints available on nightly only: +# fuzzy_provenance_casts = "warn" +# lossy_provenance_casts = "warn" +# multiple_supertrait_upcastable = "warn" +# must_not_suspend = "warn" +# non_exhaustive_omitted_patterns = "warn" +# unnameable_types = "warn" + +[lints.clippy] +###################################################################################################################### +# Broad Clippy groups +###################################################################################################################### + +all = { level = "warn", priority = -1 } +pedantic = { level = "warn", priority = -1 } +nursery = { level = "warn", priority = -1 } + +# checks Cargo.toml quality, feature naming, duplicate crate versions, wildcard deps, etc. +cargo = { level = "warn", priority = -1 } + +###################################################################################################################### +# Lints commonly overridden depending on project state or type +###################################################################################################################### + +# These are noisy during early development, but useful before release. +missing_panics_doc = "warn" +missing_errors_doc = "warn" + +# Disable or locally allow until proper error handling has been implemented. +unwrap_used = "warn" +panic = "warn" +todo = "warn" +expect_used = "warn" +missing_assert_message = "warn" +unwrap_in_result = "warn" +indexing_slicing = "warn" +panic_in_result_fn = "warn" + +# Libraries should generally not write to stdout/stderr. +# For CLI binaries, locally allow these in the presentation / command layer with a reason. +print_stderr = "warn" +print_stdout = "warn" + +# These two lints can interfere. Keep one allowed globally and handle the other locally. +pattern_type_mismatch = "allow" +needless_borrowed_reference = "warn" + +###################################################################################################################### +# Cargo / manifest lints +###################################################################################################################### + +cargo_common_metadata = "warn" +multiple_crate_versions = "warn" +negative_feature_names = "warn" +redundant_feature_names = "warn" +wildcard_dependencies = "warn" + +###################################################################################################################### +# Extra strict restriction lints worth considering +###################################################################################################################### + +arithmetic_side_effects = "warn" +as_conversions = "warn" +as_pointer_underscore = "warn" +big_endian_bytes = "warn" +default_numeric_fallback = "warn" +else_if_without_else = "warn" +exhaustive_enums = "warn" +exhaustive_structs = "warn" +field_scoped_visibility_modifiers = "warn" +float_arithmetic = "warn" +impl_trait_in_params = "warn" +integer_division = "warn" +little_endian_bytes = "warn" +missing_docs_in_private_items = "warn" +pub_use = "warn" +ref_patterns = "warn" +same_name_method = "warn" +str_to_string = "warn" +string_add = "warn" +unnecessary_self_imports = "warn" +unseparated_literal_suffix = "warn" +wildcard_enum_match_arm = "warn" + +###################################################################################################################### +# Maintenance lints that are generally overridden, but useful for cleanup +###################################################################################################################### + +# Noisy, but useful for periodic cleanup. +shadow_unrelated = "warn" + +###################################################################################################################### +# Lints that should only be overridden for a small scope with a comment explaining the exception +###################################################################################################################### + +absolute_paths = "warn" +alloc_instead_of_core = "warn" +allow_attributes = "warn" +allow_attributes_without_reason = "warn" +assertions_on_result_states = "warn" +clone_on_ref_ptr = "warn" +create_dir = "warn" +dbg_macro = "warn" +default_union_representation = "warn" +deref_by_slicing = "warn" +disallowed_script_idents = "warn" +empty_drop = "warn" +empty_enum_variants_with_brackets = "warn" +empty_structs_with_brackets = "warn" +error_impl_error = "warn" +exit = "warn" +filetype_is_file = "warn" +float_cmp_const = "warn" +fn_to_numeric_cast_any = "warn" +format_push_string = "warn" +get_unwrap = "warn" +host_endian_bytes = "warn" +if_then_some_else_none = "warn" +infinite_loop = "warn" +inline_asm_x86_att_syntax = "warn" +inline_asm_x86_intel_syntax = "warn" +large_include_file = "warn" +let_underscore_must_use = "warn" +let_underscore_untyped = "warn" +lossy_float_literal = "warn" +map_err_ignore = "warn" +mem_forget = "warn" +min_ident_chars = "warn" +mixed_read_write_in_expression = "warn" +mod_module_files = "warn" +modulo_arithmetic = "warn" +module_name_repetitions = "warn" +multiple_inherent_impl = "allow" +multiple_unsafe_ops_per_block = "warn" +mutex_atomic = "warn" +needless_raw_strings = "warn" +pub_without_shorthand = "warn" +rc_buffer = "warn" +rc_mutex = "warn" +redundant_type_annotations = "warn" +rest_pat_in_fully_bound_structs = "warn" +semicolon_inside_block = "warn" +shadow_same = "warn" +single_char_lifetime_names = "warn" +string_lit_chars_any = "warn" +string_slice = "warn" +suspicious_xor_used_as_pow = "warn" +tests_outside_test_module = "warn" +try_err = "warn" +undocumented_unsafe_blocks = "warn" +unimplemented = "warn" +unnecessary_safety_comment = "warn" +unnecessary_safety_doc = "warn" +verbose_file_reads = "warn" diff --git a/README.md b/README.md index 2feaeea..485a5dc 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,16 @@ cargo test This project was developed with AI assistance using GitHub Copilot and Claude Sonnet 4.5. +### Pre-commit Hooks + +To enable pre-commit hooks that run tests and linting before commits, run: + +```bash +git config core.hooksPath .githooks +``` + +This will ensure code quality checks run automatically before each commit. + ## License See LICENSE file for details. diff --git a/src/app.rs b/src/app.rs index 0fd5e09..3e29c84 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,3 +1,8 @@ +//! Application entry point that orchestrates the TCP servers and child process lifecycle. +//! +//! Spawns all the server threads (protocol, stderr, health) and manages the child process +//! coordination loop. + use std::{ net::TcpListener, sync::{Arc, mpsc}, @@ -12,7 +17,34 @@ use crate::{ servers::{health::health_server, protocol::protocol_server, stderr::stderr_server}, }; -pub fn run(args: Args) -> Result<(), anyhow::Error> { +/// Run the stdioxide forwarder with the given configuration. +/// +/// This function: +/// 1. Starts the child process with the specified command and arguments +/// 2. Binds TCP listeners on the configured ports (protocol, stderr, health) +/// 3. Spawns threads to pump child process output into shared buffered state +/// 4. Spawns server threads to handle client connections on each port +/// 5. Coordinates child process lifecycle and graceful shutdown +/// +/// The function blocks until the child process exits or is terminated. +/// +/// # Errors +/// +/// Returns an error if: +/// - Port binding fails (port already in use or insufficient permissions) +/// - Child process fails to start +/// - Child process coordinator thread panics +/// +/// # Example +/// +/// ```no_run +/// use stdioxide::{app, args::Args}; +/// use clap::Parser; +/// +/// let args = Args::parse(); +/// app::run(&args).expect("Failed to run stdioxide"); +/// ``` +pub fn run(args: &Args) -> Result<(), anyhow::Error> { let protocol_listener = TcpListener::bind(("0.0.0.0", args.protocol_port))?; let stderr_listener = TcpListener::bind(("0.0.0.0", args.stderr_port))?; let health_listener = TcpListener::bind(("0.0.0.0", args.health_port))?; @@ -27,14 +59,14 @@ pub fn run(args: Args) -> Result<(), anyhow::Error> { { let stdout_state = Arc::clone(&stdout_state); thread::spawn(move || { - let _ = pump_output_to_state(child.stdout, stdout_state, "stdout"); + drop(pump_output_to_state(child.stdout, &stdout_state, "stdout")); }); } { let stderr_state = Arc::clone(&stderr_state); thread::spawn(move || { - let _ = pump_output_to_state(child.stderr, stderr_state, "stderr"); + drop(pump_output_to_state(child.stderr, &stderr_state, "stderr")); }); } @@ -42,7 +74,12 @@ pub fn run(args: Args) -> Result<(), anyhow::Error> { let stdout_state = Arc::clone(&stdout_state); let control_tx = control_tx.clone(); thread::spawn(move || { - let _ = protocol_server(protocol_listener, stdout_state, child.stdin, control_tx); + drop(protocol_server( + &protocol_listener, + stdout_state, + child.stdin, + control_tx, + )); }); } @@ -50,7 +87,7 @@ pub fn run(args: Args) -> Result<(), anyhow::Error> { let stderr_state = Arc::clone(&stderr_state); let control_tx = control_tx.clone(); thread::spawn(move || { - let _ = stderr_server(stderr_listener, stderr_state, control_tx); + drop(stderr_server(&stderr_listener, &stderr_state, &control_tx)); }); } @@ -60,16 +97,16 @@ pub fn run(args: Args) -> Result<(), anyhow::Error> { { thread::spawn(move || { - let _ = health_server(health_listener); + health_server(&health_listener); }); } let coordinator_thread: JoinHandle> = - thread::spawn(move || run_child_coordinator(child.job, control_rx)); + thread::spawn(move || run_child_coordinator(&child.job, &control_rx)); coordinator_thread .join() - .expect("Failed to join coordinator thread")?; + .map_err(|error| anyhow::anyhow!("Child coordinator thread panicked: {error:?}"))??; Ok(()) } diff --git a/src/args.rs b/src/args.rs index 44b1cea..08625fd 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,18 +1,54 @@ +use std::{ + env::{self, args_os}, + ffi::OsString, +}; + use clap::Parser; +/// Trait for abstracting environment variable access. +/// +/// This allows dependency injection of environment providers in tests, +/// avoiding the need to mutate process-wide environment variables. +pub trait Env { + /// Get an environment variable value. + fn var(&self, key: &str) -> Option; +} + +/// Production environment provider that reads from the process environment. +#[derive(Debug, Default, Clone, Copy)] +#[non_exhaustive] +pub struct ProcessEnv; + +impl Env for ProcessEnv { + fn var(&self, key: &str) -> Option { + env::var(key).ok() + } +} + +/// Command-line arguments for stdioxide. +/// +/// Configures the TCP ports and child process to launch. Ports can be specified +/// via command-line flags or environment variables (flags take precedence). +/// +/// # Example +/// +/// ```bash +/// stdioxide --protocol-port 7000 --stderr-port 7001 --health-port 7002 python script.py --arg1 --arg2 +/// ``` #[derive(Parser, Debug)] #[command(version, about, long_about = None)] +#[non_exhaustive] pub struct Args { /// The port to use for forwarding stdin and stdout. - #[arg(long, env = "STDIOXIDE_PROTOCOL_PORT", default_value_t = 7000)] + #[arg(long, default_value_t = 7000)] pub protocol_port: u16, /// The port to use for forwarding stderr. - #[arg(long, env = "STDIOXIDE_STDERR_PORT", default_value_t = 7001)] + #[arg(long, default_value_t = 7001)] pub stderr_port: u16, /// The port to use for health checks. - #[arg(long, env = "STDIOXIDE_HEALTH_PORT", default_value_t = 7002)] + #[arg(long, default_value_t = 7002)] pub health_port: u16, /// The command to run as a subprocess. @@ -24,147 +60,275 @@ pub struct Args { pub args: Vec, } +impl Args { + /// Parse arguments from the process environment and command line. + /// + /// This is the main entry point for production use. + #[expect( + clippy::same_name_method, + reason = "We want to mimic the API of clap’s `parse()` method while adding environment variable support" + )] + #[must_use] + pub fn parse() -> Self { + Self::parse_from_env(&ProcessEnv) + } + + /// Parse arguments using a custom environment provider. + /// + /// This method allows dependency injection of environment variables, + /// which is useful for testing without mutating global state. + pub fn parse_from_env(env: &E) -> Self { + Self::parse_from_env_and_args(env, args_os()) + } + + /// Parse arguments from a custom environment and argument iterator. + /// + /// This is the most flexible parsing method, used internally and in tests. + pub fn parse_from_env_and_args, A: IntoIterator>( + env: &E, + args: A, + ) -> Self { + // Build argument list with environment variable defaults injected. + let mut arg_vec: Vec = args.into_iter().map(Into::into).collect(); + + // Check if ports are provided via CLI; if not, inject from environment. + let has_protocol_port = arg_vec + .iter() + .any(|arg| arg.to_str() == Some("--protocol-port")); + let has_stderr_port = arg_vec + .iter() + .any(|arg| arg.to_str() == Some("--stderr-port")); + let has_health_port = arg_vec + .iter() + .any(|arg| arg.to_str() == Some("--health-port")); + + // Inject environment variables as CLI args if not already present. + let mut insertions = Vec::new(); + + if !has_protocol_port && let Some(port) = env.var("STDIOXIDE_PROTOCOL_PORT") { + insertions.push("--protocol-port".to_owned()); + insertions.push(port); + } + if !has_stderr_port && let Some(port) = env.var("STDIOXIDE_STDERR_PORT") { + insertions.push("--stderr-port".to_owned()); + insertions.push(port); + } + if !has_health_port && let Some(port) = env.var("STDIOXIDE_HEALTH_PORT") { + insertions.push("--health-port".to_owned()); + insertions.push(port); + } + // Insert environment-derived args after the program name but before other args. + if !insertions.is_empty() && !arg_vec.is_empty() { + let rest = arg_vec.split_off(1); + arg_vec.extend(insertions.into_iter().map(Into::into)); + arg_vec.extend(rest); + } + + Self::try_parse_from(arg_vec).unwrap_or_else(|error| error.exit()) + } +} + +#[cfg(test)] +impl Args { + /// Parse arguments from a custom environment and argument iterator for testing. + /// + /// Returns a Result instead of exiting on error, allowing tests to verify failure cases. + fn try_parse_from_env_and_args, A: IntoIterator>( + env: &E, + args: A, + ) -> Result { + // Build argument list with environment variable defaults injected + let mut arg_vec: Vec = args.into_iter().map(Into::into).collect(); + + // Check if ports are provided via CLI; if not, inject from environment + let has_protocol_port = arg_vec + .iter() + .any(|arg| arg.to_str() == Some("--protocol-port")); + let has_stderr_port = arg_vec + .iter() + .any(|arg| arg.to_str() == Some("--stderr-port")); + let has_health_port = arg_vec + .iter() + .any(|arg| arg.to_str() == Some("--health-port")); + + // Inject environment variables as CLI args if not already present + let mut insertions = Vec::new(); + + if !has_protocol_port && let Some(port) = env.var("STDIOXIDE_PROTOCOL_PORT") { + insertions.push("--protocol-port".to_owned()); + insertions.push(port); + } + if !has_stderr_port && let Some(port) = env.var("STDIOXIDE_STDERR_PORT") { + insertions.push("--stderr-port".to_owned()); + insertions.push(port); + } + if !has_health_port && let Some(port) = env.var("STDIOXIDE_HEALTH_PORT") { + insertions.push("--health-port".to_owned()); + insertions.push(port); + } + + // Insert environment-derived args after the program name but before other args. + if !insertions.is_empty() && !arg_vec.is_empty() { + let rest = arg_vec.split_off(1); + arg_vec.extend(insertions.into_iter().map(Into::into)); + arg_vec.extend(rest); + } + + Self::try_parse_from(arg_vec) + } +} + #[cfg(test)] mod tests { use super::*; - use std::sync::Mutex; - - // Mutex to serialize tests that modify environment variables. - // This prevents interference between parallel test execution. - static ENV_MUTEX: Mutex<()> = Mutex::new(()); - - /// RAII helper for temporarily setting environment variables in tests. - /// Automatically restores the original state when dropped. - struct EnvVar { - key: String, - original_value: Option, - } - - impl EnvVar { - /// Set an environment variable temporarily. - fn set(key: impl Into, value: impl AsRef) -> Self { - let key = key.into(); - let original_value = std::env::var(&key).ok(); - unsafe { - std::env::set_var(&key, value.as_ref()); - } + use std::collections::HashMap; + + /// Test environment provider backed by an in-memory hashmap. + /// + /// This allows testing environment variable behavior without mutating + /// the process-wide environment state. + #[derive(Debug, Default)] + struct TestEnv { + values: HashMap, + } + + impl TestEnv { + fn new(values: impl IntoIterator) -> Self { Self { - key, - original_value, + values: values + .into_iter() + .map(|(key, value)| (key.to_owned(), value.to_owned())) + .collect(), } } } - impl Drop for EnvVar { - fn drop(&mut self) { - unsafe { - match &self.original_value { - Some(value) => std::env::set_var(&self.key, value), - None => std::env::remove_var(&self.key), - } - } + impl Env for TestEnv { + fn var(&self, key: &str) -> Option { + self.values.get(key).cloned() } } #[test] - fn test_default_port_values() { - let _lock = ENV_MUTEX.lock().unwrap(); - let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + fn test_default_port_values() -> Result<(), anyhow::Error> { + let env = TestEnv::default(); + let args = Args::try_parse_from_env_and_args(&env, ["stdioxide", "echo"])?; assert_eq!(args.protocol_port, 7000); assert_eq!(args.stderr_port, 7001); assert_eq!(args.health_port, 7002); + Ok(()) } #[test] - fn test_custom_port_values_via_args() { - let args = Args::try_parse_from([ - "stdioxide", - "--protocol-port", - "8000", - "--stderr-port", - "8001", - "--health-port", - "8002", - "echo", - ]) - .unwrap(); + fn test_custom_port_values_via_args() -> Result<(), anyhow::Error> { + let env = TestEnv::default(); + let args = Args::try_parse_from_env_and_args( + &env, + [ + "stdioxide", + "--protocol-port", + "8000", + "--stderr-port", + "8001", + "--health-port", + "8002", + "echo", + ], + )?; assert_eq!(args.protocol_port, 8000); assert_eq!(args.stderr_port, 8001); assert_eq!(args.health_port, 8002); + Ok(()) } #[test] - fn test_command_and_args() { - let args = Args::try_parse_from(["stdioxide", "python", "-m", "http.server"]).unwrap(); + fn test_command_and_args() -> Result<(), anyhow::Error> { + let env = TestEnv::default(); + let args = + Args::try_parse_from_env_and_args(&env, ["stdioxide", "python", "-m", "http.server"])?; assert_eq!(args.command, "python"); assert_eq!(args.args, vec!["-m", "http.server"]); + Ok(()) } #[test] - fn test_args_with_hyphen_values() { - let args = Args::try_parse_from(["stdioxide", "myapp", "--flag", "-value"]).unwrap(); + fn test_args_with_hyphen_values() -> Result<(), anyhow::Error> { + let env = TestEnv::default(); + let args = + Args::try_parse_from_env_and_args(&env, ["stdioxide", "myapp", "--flag", "-value"])?; assert_eq!(args.command, "myapp"); assert_eq!(args.args, vec!["--flag", "-value"]); + Ok(()) } #[test] - fn test_empty_args() { - let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + fn test_empty_args() -> Result<(), anyhow::Error> { + let env = TestEnv::default(); + let args = Args::try_parse_from_env_and_args(&env, ["stdioxide", "echo"])?; assert_eq!(args.command, "echo"); assert!(args.args.is_empty()); + Ok(()) } #[test] fn test_missing_command_fails() { - let result = Args::try_parse_from(["stdioxide"]); - assert!(result.is_err()); + let env = TestEnv::default(); + let result = Args::try_parse_from_env_and_args(&env, ["stdioxide"]); + assert!( + result.is_err(), + "Expected parsing to fail when command is missing" + ); } #[test] - fn test_env_var_protocol_port() { - let _lock = ENV_MUTEX.lock().unwrap(); - let _env = EnvVar::set("STDIOXIDE_PROTOCOL_PORT", "9000"); - let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + fn test_env_var_protocol_port() -> Result<(), anyhow::Error> { + let env = TestEnv::new([("STDIOXIDE_PROTOCOL_PORT", "9000")]); + let args = Args::try_parse_from_env_and_args(&env, ["stdioxide", "echo"])?; assert_eq!(args.protocol_port, 9000); + Ok(()) } #[test] - fn test_env_var_stderr_port() { - let _lock = ENV_MUTEX.lock().unwrap(); - let _env = EnvVar::set("STDIOXIDE_STDERR_PORT", "9001"); - let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + fn test_env_var_stderr_port() -> Result<(), anyhow::Error> { + let env = TestEnv::new([("STDIOXIDE_STDERR_PORT", "9001")]); + let args = Args::try_parse_from_env_and_args(&env, ["stdioxide", "echo"])?; assert_eq!(args.stderr_port, 9001); + Ok(()) } #[test] - fn test_env_var_health_port() { - let _lock = ENV_MUTEX.lock().unwrap(); - let _env = EnvVar::set("STDIOXIDE_HEALTH_PORT", "9002"); - let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + fn test_env_var_health_port() -> Result<(), anyhow::Error> { + let env = TestEnv::new([("STDIOXIDE_HEALTH_PORT", "9002")]); + let args = Args::try_parse_from_env_and_args(&env, ["stdioxide", "echo"])?; assert_eq!(args.health_port, 9002); + Ok(()) } #[test] - fn test_cli_args_override_env_vars() { - let _lock = ENV_MUTEX.lock().unwrap(); - let _env1 = EnvVar::set("STDIOXIDE_PROTOCOL_PORT", "9000"); - let _env2 = EnvVar::set("STDIOXIDE_STDERR_PORT", "9001"); - let _env3 = EnvVar::set("STDIOXIDE_HEALTH_PORT", "9002"); - - let args = Args::try_parse_from([ - "stdioxide", - "--protocol-port", - "8000", - "--stderr-port", - "8001", - "--health-port", - "8002", - "echo", - ]) - .unwrap(); + fn test_cli_args_override_env_vars() -> Result<(), anyhow::Error> { + let env = TestEnv::new([ + ("STDIOXIDE_PROTOCOL_PORT", "9000"), + ("STDIOXIDE_STDERR_PORT", "9001"), + ("STDIOXIDE_HEALTH_PORT", "9002"), + ]); + + let args = Args::try_parse_from_env_and_args( + &env, + [ + "stdioxide", + "--protocol-port", + "8000", + "--stderr-port", + "8001", + "--health-port", + "8002", + "echo", + ], + )?; assert_eq!(args.protocol_port, 8000); assert_eq!(args.stderr_port, 8001); assert_eq!(args.health_port, 8002); + Ok(()) } } diff --git a/src/child.rs b/src/child.rs index f1170f9..2daeb6b 100644 --- a/src/child.rs +++ b/src/child.rs @@ -1,14 +1,30 @@ +//! Child process spawning and stream capture. + +use std::fs; + use subprocess::{Exec, Job, Redirection}; -pub struct StartedChild { +/// A spawned child process with captured `stdin`, `stdout`, and `stderr` streams. +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) struct StartedChild { + /// The subprocess `Job` handle for process lifecycle management. pub job: Job, - pub stdin: std::fs::File, - pub stdout: std::fs::File, - pub stderr: std::fs::File, + /// File handle for writing to the child’s `stdin`. + pub stdin: fs::File, + /// File handle for reading from the child’s `stdout`. + pub stdout: fs::File, + /// File handle for reading from the child’s `stderr`. + pub stderr: fs::File, } impl StartedChild { - pub fn start(command: &str, args: &[String]) -> Result { + /// Spawns a child process with the given command and arguments. + /// + /// All three standard streams (`stdin`, `stdout`, `stderr`) are captured as pipes. + pub(crate) fn start(command: &str, args: &[String]) -> Result { let mut process = Exec::cmd(command); for arg in args { process = process.arg(arg); @@ -21,15 +37,15 @@ impl StartedChild { let child_stdin = job .stdin .take() - .ok_or_else(|| anyhow::anyhow!("Failed to capture child stdin"))?; + .ok_or_else(|| anyhow::anyhow!("Failed to capture child `stdin`"))?; let child_stdout = job .stdout .take() - .ok_or_else(|| anyhow::anyhow!("Failed to capture child stdout"))?; + .ok_or_else(|| anyhow::anyhow!("Failed to capture child `stdout`"))?; let child_stderr = job .stderr .take() - .ok_or_else(|| anyhow::anyhow!("Failed to capture child stderr"))?; + .ok_or_else(|| anyhow::anyhow!("Failed to capture child `stderr`"))?; Ok(Self { job, stdin: child_stdin, diff --git a/src/control.rs b/src/control.rs index fe0d4f9..dcab1cb 100644 --- a/src/control.rs +++ b/src/control.rs @@ -1,27 +1,43 @@ -use std::sync::mpsc; +//! Child process lifecycle coordination and control messages. +use std::{sync::mpsc, time::Duration}; use subprocess::Job; +use tracing::info; +/// Messages sent to the child process coordinator to control lifecycle. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ControlMessage { +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) enum ControlMessage { + /// Terminate the child process immediately. KillChild, } -pub fn run_child_coordinator( - job: Job, - control_rx: mpsc::Receiver, +/// Monitors the child process and handles termination requests. +/// +/// Polls the child process for exit and listens for control messages to kill it. +/// Returns when the child process exits or is explicitly terminated. +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) fn run_child_coordinator( + job: &Job, + control_rx: &mpsc::Receiver, ) -> Result<(), anyhow::Error> { loop { if let Some(status) = job.poll() { - eprintln!("Child process exited with status: {status}"); + info!("Child process exited with status: {status}"); return Ok(()); } - match control_rx.recv_timeout(std::time::Duration::from_millis(100)) { + match control_rx.recv_timeout(Duration::from_millis(100)) { Ok(ControlMessage::KillChild) => { - let _ = job.kill(); + drop(job.kill()); let status = job.wait()?; - eprintln!("Child process killed; exit status: {status}"); + info!("Child process killed; exit status: {status}"); return Ok(()); } Err(mpsc::RecvTimeoutError::Timeout) => { @@ -29,10 +45,10 @@ pub fn run_child_coordinator( } Err(mpsc::RecvTimeoutError::Disconnected) => { // All senders are gone; we terminate the child process and exit. - eprintln!("Control channel disconnected; terminating child process"); - let _ = job.kill(); + info!("Control channel disconnected; terminating child process"); + drop(job.kill()); let status = job.wait()?; - eprintln!("Child process killed; exit status: {status}"); + info!("Child process killed; exit status: {status}"); return Ok(()); } } diff --git a/src/lib.rs b/src/lib.rs index 5be80d2..f3ed91d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,46 @@ +//! A TCP forwarder that exposes a child process’s stdin, stdout, and stderr streams over the network. +//! +//! stdioxide launches an arbitrary child process and forwards its standard streams over two TCP ports, +//! allowing remote interaction with any command-line application. Output is buffered to prevent data +//! loss when no clients are connected. A third TCP port provides health check functionality for +//! container orchestrators. +//! +//! # Architecture +//! +//! - **Protocol Port**: Bidirectional communication for stdin/stdout (single client, kills child on disconnect) +//! - **Stderr Port**: Reconnectable stderr streaming with buffering (single client, child continues on disconnect) +//! - **Health Port**: Simple readiness check endpoint +//! +//! # Example +//! +//! ```no_run +//! use stdioxide::{app, args::Args}; +//! use clap::Parser; +//! +//! let args = Args::parse(); +//! app::run(&args).expect("Failed to run stdioxide"); +//! ``` + +#![allow( + unused_crate_dependencies, + reason = "dev-dependencies available to lib tests" +)] +#![cfg_attr( + test, + allow( + clippy::panic_in_result_fn, + reason = "Using `assert!()`s is idiomatic, but we need to return `Result`s to be able to return I/O-related errors." + ) +)] + pub mod app; + +/// Command-line argument parsing and configuration. +/// +/// Defines the `Args` struct with port configurations and child process command. pub mod args; -pub mod child; -pub mod control; -pub mod output; -pub mod servers; + +pub(crate) mod child; +pub(crate) mod control; +pub(crate) mod output; +pub(crate) mod servers; diff --git a/src/main.rs b/src/main.rs index 8b925df..87b950b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,22 @@ -use clap::Parser; +//! stdioxide binary entry point. +//! +//! Launches the TCP forwarder that exposes a child process’s standard streams over the network. + +#![allow( + unused_crate_dependencies, + reason = "The binary depends on subprocess transitively through the stdioxide library" +)] + +use std::io::stderr; + use stdioxide::{app, args::Args}; fn main() -> Result<(), anyhow::Error> { + tracing_subscriber::fmt() + .with_writer(stderr) + .with_target(false) + .init(); + let args = Args::parse(); - app::run(args) + app::run(&args) } diff --git a/src/output.rs b/src/output.rs index 5aab68b..7602ba1 100644 --- a/src/output.rs +++ b/src/output.rs @@ -1,5 +1,7 @@ +//! Output buffering and stream serving logic. + use std::{ - io::{Read, Write}, + io::{self, Read, Write}, net::TcpStream, sync::{ Arc, Condvar, Mutex, @@ -8,26 +10,52 @@ use std::{ }, }; +use tracing::{debug, info}; + use crate::control::ControlMessage; +/// Defines how output serving should handle client disconnection. #[derive(Debug, Clone)] -pub enum ServingBehavior { +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) enum ServingBehavior { + /// Kill the child process when the client disconnects (protocol port behavior). KillChildOnDisconnect, + /// Keep child alive on disconnect, allow reconnection (`stderr` port behavior). + /// + /// The `Arc` tracks whether a connection is currently active. DoNotKillChildOnDisconnect(Arc), } -pub struct OutputState { +/// Buffered output state from a child process stream. +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) struct OutputState { + /// Accumulated output bytes not yet sent to clients. pub buffer: Vec, + /// Whether EOF has been reached on the source stream. pub eof: bool, } -pub struct NotifyableOutputState { +/// Thread-safe output state with condition variable for synchronization. +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) struct NotifyableOutputState { + /// Protected output buffer and EOF flag. pub state: Mutex, + /// Condition variable to notify waiters when new output arrives. pub condition_variable: Condvar, } impl NotifyableOutputState { - pub fn new() -> Self { + /// Creates a new empty output state. + pub(crate) fn new() -> Self { Self::default() } } @@ -45,60 +73,74 @@ impl Default for NotifyableOutputState { } /// Pumps data from the given `source` (either `stdout` or `stderr` of the child process) into the shared `state`. -pub fn pump_output_to_state( +/// +/// Continuously reads from the source and appends to the buffer, notifying waiters on each read. +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) fn pump_output_to_state( mut source: impl Read, - output_state: Arc, + output_state: &Arc, label: &'static str, ) -> Result<(), anyhow::Error> { loop { - let mut buffer = [0u8; 8192]; + let mut buffer = [0_u8; 8192]; let num_bytes_read = source.read(&mut buffer)?; - let mut guard = output_state - .state - .lock() - .expect("Failed to lock output state"); + { + let mut guard = output_state.state.lock().map_err(|error| { + anyhow::anyhow!("Failed to lock output state for {label}: {error}") + })?; - if num_bytes_read == 0 { - eprintln!("[{label}] EOF reached"); - guard.eof = true; - output_state.condition_variable.notify_all(); - break; - } + if num_bytes_read == 0 { + debug!("[{label}] EOF reached"); + guard.eof = true; + output_state.condition_variable.notify_all(); + break; + } - let chunk = &buffer[..num_bytes_read]; - guard.buffer.extend_from_slice(chunk); + let chunk = buffer.get(..num_bytes_read).unwrap_or_default(); + guard.buffer.extend_from_slice(chunk); + } output_state.condition_variable.notify_all(); } Ok(()) } -pub fn serve_output_on_stream( +/// Serves output from the shared `state` to the given `stream`. +/// +/// Waits for output to become available, then writes it to the TCP stream. +/// Handles disconnection according to the specified `serving_behavior`. +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) fn serve_output_on_stream( mut stream: TcpStream, - output_state: Arc, - control_tx: mpsc::Sender, - serving_behavior: ServingBehavior, + output_state: &Arc, + control_tx: &mpsc::Sender, + serving_behavior: &ServingBehavior, label: &'static str, ) -> Result<(), anyhow::Error> { loop { let buffered_data = { - let mut guard = output_state - .state - .lock() - .expect("Failed to lock stdout state"); + let mut guard = output_state.state.lock().map_err(|error| { + anyhow::anyhow!("Failed to lock stdout state for {label}: {error}") + })?; while guard.buffer.is_empty() && !guard.eof { // Wait until there’s either new output to send or we’ve reached EOF. guard = output_state .condition_variable .wait(guard) - .expect("Failed to wait on condition variable"); + .map_err(|error| { + anyhow::anyhow!("Failed to wait on condition variable for {label}: {error}") + })?; } if guard.buffer.is_empty() && guard.eof { - eprintln!( - "[{label}] EOF reached and no buffered output; closing client connection" - ); + debug!("[{label}] EOF reached and no buffered output; closing client connection"); return Ok(()); } @@ -110,17 +152,16 @@ pub fn serve_output_on_stream( let mut num_bytes_written = 0; while num_bytes_written < buffered_data.len() { - match stream.write(&buffered_data[num_bytes_written..]) { + match stream.write(buffered_data.get(num_bytes_written..).unwrap_or_default()) { Ok(0) => { // Treat as connection no longer writable. break; } Ok(n) => { - num_bytes_written += n; + num_bytes_written = num_bytes_written.saturating_add(n); } - Err(e) if e.kind() == std::io::ErrorKind::Interrupted => { + Err(error) if error.kind() == io::ErrorKind::Interrupted => { // Interrupted by a signal, just retry. - continue; } Err(_) => { // Any other error is treated as the connection being no longer writable. @@ -131,10 +172,10 @@ pub fn serve_output_on_stream( // Before draining the buffer, check if the connection is still active (for `stderr` reconnect support). // If the read monitoring thread detected a disconnect, we should NOT drain the buffer to prevent data loss. - if let ServingBehavior::DoNotKillChildOnDisconnect(ref active) = serving_behavior + if let ServingBehavior::DoNotKillChildOnDisconnect(active) = serving_behavior && !active.load(Ordering::Acquire) { - eprintln!( + info!( "[{label}] Connection no longer active (detected by monitoring thread); exiting without draining buffer to prevent data loss" ); return Ok(()); @@ -143,7 +184,7 @@ pub fn serve_output_on_stream( let mut guard = output_state .state .lock() - .expect("Failed to lock stdout state"); + .map_err(|error| anyhow::anyhow!("Failed to lock stdout state for {label}: {error}"))?; // Since we copied the buffer, there may have been new output produced while we were writing to the stream. We // only remove the number of bytes that we successfully wrote, so that any new output will still be in the buffer @@ -155,13 +196,13 @@ pub fn serve_output_on_stream( // We treat this as the connection being no longer writable and exit the loop (and potentially kill the // child process, depending on the serving behavior). if matches!(serving_behavior, ServingBehavior::KillChildOnDisconnect) { - let _ = control_tx.send(ControlMessage::KillChild); + let _result = control_tx.send(ControlMessage::KillChild); } return Ok(()); } if guard.eof && guard.buffer.is_empty() { - eprintln!("[{label}] EOF reached; closing client connection"); + debug!("[{label}] EOF reached; closing client connection"); return Ok(()); } } @@ -173,43 +214,55 @@ mod tests { use std::io::Cursor; #[test] - fn test_pump_output_to_state_empty_input() { + fn test_pump_output_to_state_empty_input() -> Result<(), anyhow::Error> { let state = Arc::new(NotifyableOutputState::new()); let input = Cursor::new(Vec::::new()); - let result = pump_output_to_state(input, Arc::clone(&state), "test"); - assert!(result.is_ok()); + pump_output_to_state(input, &state, "test")?; - let guard = state.state.lock().unwrap(); + let guard = state + .state + .lock() + .map_err(|error| anyhow::anyhow!("Failed to lock output state for test: {error}"))?; assert!(guard.buffer.is_empty()); assert!(guard.eof); + drop(guard); // Only needed to satisfy Clippy ¯\_(ツ)_/¯ + Ok(()) } #[test] - fn test_pump_output_to_state_single_chunk() { + fn test_pump_output_to_state_single_chunk() -> Result<(), anyhow::Error> { let state = Arc::new(NotifyableOutputState::new()); let data = b"Hello, World!"; let input = Cursor::new(data.to_vec()); - let result = pump_output_to_state(input, Arc::clone(&state), "test"); - assert!(result.is_ok()); + pump_output_to_state(input, &state, "test")?; - let guard = state.state.lock().unwrap(); + let guard = state + .state + .lock() + .map_err(|error| anyhow::anyhow!("Failed to lock output state for test: {error}"))?; assert_eq!(guard.buffer, data); assert!(guard.eof); + drop(guard); // Only needed to satisfy Clippy ¯\_(ツ)_/¯ + Ok(()) } #[test] - fn test_pump_output_to_state_multiple_chunks() { + fn test_pump_output_to_state_multiple_chunks() -> Result<(), anyhow::Error> { let state = Arc::new(NotifyableOutputState::new()); - let data = vec![0u8; 16384]; // Larger than buffer size (8192). + let data = vec![0_u8; 16384]; // Larger than buffer size (8192). let input = Cursor::new(data.clone()); - let result = pump_output_to_state(input, Arc::clone(&state), "test"); - assert!(result.is_ok()); + pump_output_to_state(input, &state, "test")?; - let guard = state.state.lock().unwrap(); + let guard = state + .state + .lock() + .map_err(|error| anyhow::anyhow!("Failed to lock output state for test: {error}"))?; assert_eq!(guard.buffer, data); assert!(guard.eof); + drop(guard); // Only needed to satisfy Clippy ¯\_(ツ)_/¯ + Ok(()) } } diff --git a/src/servers.rs b/src/servers.rs new file mode 100644 index 0000000..9da26aa --- /dev/null +++ b/src/servers.rs @@ -0,0 +1,19 @@ +//! TCP server implementations for protocol, stderr, and health endpoints. + +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) mod health; + +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) mod protocol; + +#[expect( + clippy::redundant_pub_crate, + reason = "Linting conflict with `rustc::unreachable_pub`." +)] +pub(crate) mod stderr; diff --git a/src/servers/health.rs b/src/servers/health.rs index 3fba5d3..ecdbfa9 100644 --- a/src/servers/health.rs +++ b/src/servers/health.rs @@ -1,18 +1,20 @@ +//! Health check TCP server that accepts and immediately drops connections. + use std::net::TcpListener; +use tracing::warn; + /// Waits for clients to connect on the `health` port, and immediately drops any connections. The existence /// of a successful connection is used by the client as a health check for whether the process is alive. -pub fn health_server(listener: TcpListener) -> Result<(), anyhow::Error> { +pub(crate) fn health_server(listener: &TcpListener) { for stream in listener.incoming() { match stream { Ok(_stream) => { // Immediately drop it; successful connect is enough. } - Err(e) => { - eprintln!("[health] accept failed: {e}"); + Err(error) => { + warn!("[health] accept failed: {error}"); } } } - - Ok(()) } diff --git a/src/servers/mod.rs b/src/servers/mod.rs deleted file mode 100644 index 732f3e0..0000000 --- a/src/servers/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod health; -pub mod protocol; -pub mod stderr; diff --git a/src/servers/protocol.rs b/src/servers/protocol.rs index 6e989c2..0d7b24d 100644 --- a/src/servers/protocol.rs +++ b/src/servers/protocol.rs @@ -1,42 +1,55 @@ +//! Protocol TCP server for bidirectional `stdin`/`stdout` forwarding. + use std::{ + fs, io::{Read, Write}, net::{TcpListener, TcpStream}, sync::{Arc, mpsc}, thread, }; +use tracing::info; + use crate::{ control::ControlMessage, output::{NotifyableOutputState, ServingBehavior, serve_output_on_stream}, }; +/// Forwards data from a TCP client stream to the child process’s `stdin`. +/// +/// Reads from the client and writes to child `stdin` until the client disconnects +/// or an error occurs. Sends a kill signal on disconnection. fn forward_stream_data_to_child_process( mut stream: TcpStream, - mut child_stdin: std::fs::File, - control_tx: mpsc::Sender, + mut child_stdin: fs::File, + control_tx: &mpsc::Sender, ) -> Result<(), anyhow::Error> { - let mut read_buffer = [0u8; 8192]; + let mut read_buffer = [0_u8; 8192]; loop { let num_bytes_read = match stream.read(&mut read_buffer) { Ok(0) => { - eprintln!("[protocol] client disconnected; terminating child process"); - let _ = control_tx.send(ControlMessage::KillChild); + info!("[protocol] client disconnected; terminating child process"); + let _result = control_tx.send(ControlMessage::KillChild); return Ok(()); } Ok(n) => n, - Err(e) => { - let _ = control_tx.send(ControlMessage::KillChild); - return Err(anyhow::anyhow!("Failed to read from protocol client: {e}")); + Err(error) => { + let _result = control_tx.send(ControlMessage::KillChild); + return Err(anyhow::anyhow!( + "Failed to read from protocol client: {error}" + )); } }; - if let Err(e) = child_stdin.write_all(&read_buffer[..num_bytes_read]) { - let _ = control_tx.send(ControlMessage::KillChild); - return Err(anyhow::anyhow!("Failed to write to child stdin: {e}")); + if let Err(error) = + child_stdin.write_all(read_buffer.get(..num_bytes_read).unwrap_or_default()) + { + let _result = control_tx.send(ControlMessage::KillChild); + return Err(anyhow::anyhow!("Failed to write to child stdin: {error}")); } - if let Err(e) = child_stdin.flush() { - let _ = control_tx.send(ControlMessage::KillChild); - return Err(anyhow::anyhow!("Failed to flush child stdin: {e}")); + if let Err(error) = child_stdin.flush() { + let _result = control_tx.send(ControlMessage::KillChild); + return Err(anyhow::anyhow!("Failed to flush child stdin: {error}")); } } } @@ -45,45 +58,51 @@ fn forward_stream_data_to_child_process( /// client and the child process. This function spawns two threads: one for forwarding data from /// the client to the child process’s `stdin`, and another for forwarding data from the child /// process’s `stdout` to the client. -pub fn protocol_server( - listener: TcpListener, +pub(crate) fn protocol_server( + listener: &TcpListener, stdout_state: Arc, - child_stdin: std::fs::File, + child_stdin: fs::File, control_tx: mpsc::Sender, ) -> Result<(), anyhow::Error> { // We only accept a single (i.e., the first) client connection on the protocol port. // When the client disconnects, we terminate the child process and exit the server. let (stdin_thread, stdout_thread) = match listener.accept() { Ok((stream, address)) => { - eprintln!("[protocol] client connected from {address}"); + info!("[protocol] client connected from {address}"); let cloned_stream = stream.try_clone()?; let cloned_control_tx = control_tx.clone(); ( thread::spawn(move || { - let _ = forward_stream_data_to_child_process( + drop(forward_stream_data_to_child_process( cloned_stream, child_stdin, - cloned_control_tx, - ); + &cloned_control_tx, + )); }), thread::spawn(move || { - let _ = serve_output_on_stream( + drop(serve_output_on_stream( stream, - Arc::clone(&stdout_state), - control_tx, - ServingBehavior::KillChildOnDisconnect, + &stdout_state, + &control_tx, + &ServingBehavior::KillChildOnDisconnect, "protocol", - ); + )); }), ) } - Err(e) => { - return Err(anyhow::anyhow!("Failed to accept client connection: {e}")); + Err(error) => { + return Err(anyhow::anyhow!( + "Failed to accept client connection: {error}" + )); } }; - stdin_thread.join().expect("Failed to join stdin thread"); - stdout_thread.join().expect("Failed to join stdout thread"); + stdin_thread + .join() + .map_err(|error| anyhow::anyhow!("Stdin forwarding thread panicked: {error:?}"))?; + stdout_thread + .join() + .map_err(|error| anyhow::anyhow!("Stdout forwarding thread panicked: {error:?}"))?; Ok(()) } diff --git a/src/servers/stderr.rs b/src/servers/stderr.rs index 379ac61..29d4803 100644 --- a/src/servers/stderr.rs +++ b/src/servers/stderr.rs @@ -1,3 +1,5 @@ +//! `stderr` TCP server with reconnection support and output buffering. + use std::{ io::Read, net::{TcpListener, TcpStream}, @@ -9,6 +11,8 @@ use std::{ thread, }; +use tracing::{debug, info, warn}; + use crate::{ control::ControlMessage, output::{NotifyableOutputState, ServingBehavior, serve_output_on_stream}, @@ -19,25 +23,27 @@ use crate::{ /// When disconnection is detected, the atomic flag is cleared to allow new connections. fn monitor_stderr_client_connection( mut stream: TcpStream, - has_active_connection: Arc, + has_active_connection: &Arc, ) -> Result<(), anyhow::Error> { - let mut read_buffer = [0u8; 1]; + let mut read_buffer = [0_u8; 1]; loop { match stream.read(&mut read_buffer) { Ok(0) => { // EOF; client disconnected gracefully. - eprintln!("[stderr] client disconnect detected"); + debug!("[stderr] client disconnect detected"); has_active_connection.store(false, Ordering::Release); return Ok(()); } Ok(_) => { // Ignore any data sent by client (unexpected but harmless). } - Err(e) => { + Err(error) => { // Error reading; treat as disconnection. - eprintln!("[stderr] read error (client likely disconnected): {e}"); + debug!("[stderr] read error (client likely disconnected): {error}"); has_active_connection.store(false, Ordering::Release); - return Err(anyhow::anyhow!("Failed to read from stderr client: {e}")); + return Err(anyhow::anyhow!( + "Failed to read from stderr client: {error}" + )); } } } @@ -47,10 +53,10 @@ fn monitor_stderr_client_connection( /// to the first client that connects. If that client disconnects, we wait for the next client to connect /// and serve the current `stderr` output to them instead, and so on. The function spawns two threads /// for each client connection: one for monitoring disconnection and one for writing output. -pub fn stderr_server( - listener: TcpListener, - stderr_state: Arc, - control_tx: mpsc::Sender, +pub(crate) fn stderr_server( + listener: &TcpListener, + stderr_state: &Arc, + control_tx: &mpsc::Sender, ) -> Result<(), anyhow::Error> { // We allow reconnects on the `stderr` port, but only one client at a time. When a client disconnects, // we simply wait for the next one to connect. @@ -63,12 +69,12 @@ pub fn stderr_server( .is_ok() { // Atomic value has been successfully changed from `false` to `true`. - eprintln!("[stderr] client connected from {}", stream.peer_addr()?); + info!("[stderr] client connected from {}", stream.peer_addr()?); let connection_monitoring_stream = match stream.try_clone() { - Ok(s) => s, - Err(e) => { - eprintln!("[stderr] failed to clone stream: {e}"); + Ok(stream) => stream, + Err(error) => { + warn!("[stderr] failed to clone stream: {error}"); has_active_connection.store(false, Ordering::Release); continue; } @@ -77,42 +83,42 @@ pub fn stderr_server( let has_active_connection_clone = Arc::clone(&has_active_connection); let has_active_connection_monitor = Arc::clone(&has_active_connection); let has_active_connection_write = Arc::clone(&has_active_connection); - let stderr_state = Arc::clone(&stderr_state); + let stderr_state = Arc::clone(stderr_state); let control_tx = control_tx.clone(); // Spawn read monitoring thread to detect disconnection proactively. thread::spawn(move || { - let _ = monitor_stderr_client_connection( + drop(monitor_stderr_client_connection( connection_monitoring_stream, - has_active_connection_monitor, - ); + &has_active_connection_monitor, + )); }); // Spawn write thread to serve stderr output. thread::spawn(move || { - let _ = serve_output_on_stream( + drop(serve_output_on_stream( stream, - stderr_state, - control_tx, - ServingBehavior::DoNotKillChildOnDisconnect(Arc::clone( + &stderr_state, + &control_tx, + &ServingBehavior::DoNotKillChildOnDisconnect(Arc::clone( &has_active_connection_write, )), "stderr", - ); + )); // When the write thread finishes, also clear the connection flag // (idempotent if the read thread already did this). has_active_connection_clone.store(false, Ordering::Release); }); } else { // Atomic value was already `true`, so there is already an active connection. - eprintln!( + info!( "[stderr] client connected from {}, but another client is already connected; rejecting connection", stream.peer_addr()? ); } } - Err(e) => { - eprintln!("[stderr] accept failed: {e}"); + Err(error) => { + warn!("[stderr] accept failed: {error}"); } } } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index a7a2c75..ce73f38 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -1,13 +1,31 @@ +//! Integration tests for stdioxide TCP forwarder functionality. +#![cfg(test)] +#![allow( + clippy::panic_in_result_fn, + reason = "Using `assert!()`s is idiomatic, but we need to return `Result`s to be able to return I/O-related errors." +)] + use std::{ collections::HashSet, - io::{Read, Write}, - net::{TcpListener, TcpStream}, + env, + fs::read_to_string, + io::{self, Read, Write}, + net::{Shutdown, TcpListener, TcpStream}, process::{Child, Command, Stdio}, + //string::String, sync::{LazyLock, Mutex}, thread, - time::Duration, + time::{Duration, Instant}, }; +// Acknowledge available dev-dependencies not used in this test file. +use clap as _; +use lsp_types as _; +use stdioxide as _; +use subprocess as _; +use tracing as _; +use tracing_subscriber as _; + use crate::lsp_client::LspClient; mod test_utils; @@ -21,7 +39,7 @@ static ALLOCATED_PORTS_REGISTRY: LazyLock>> = /// Find an available port by binding to port 0 and letting the OS assign one. /// Returns the port number that was assigned. /// Never returns default ports (7000, 7001, 7002) to avoid conflicts with `test_default_port_values()`. -fn find_available_port() -> u16 { +fn find_available_port() -> Result { loop { // Note: This function binds to a port to find out whether it’s available. But this exposes a race // condition if actions happen in the following order: @@ -37,14 +55,14 @@ fn find_available_port() -> u16 { // spawning the forwarder, so if it fails to bind to the port, it will try again with a different // shortly after. let port = TcpListener::bind("127.0.0.1:0") - .expect("Failed to bind to find available port") + .map_err(|error| anyhow::anyhow!("Failed to bind to find available port: {error}"))? .local_addr() - .expect("Failed to get local address") + .map_err(|error| anyhow::anyhow!("Failed to get local address: {error}"))? .port(); // Skip default ports to avoid conflicts with `test_default_port_values()`. if !(7000..=7002).contains(&port) { - return port; + return Ok(port); } } } @@ -56,6 +74,10 @@ struct PortGuard { impl Drop for PortGuard { fn drop(&mut self) { + #[expect( + clippy::unwrap_used, + reason = "This is the destructor and we cannot really do something else if locking fails, so it’s acceptable to unwrap here." + )] let mut registry = ALLOCATED_PORTS_REGISTRY.lock().unwrap(); for port in &self.ports { registry.remove(port); @@ -78,12 +100,12 @@ impl AllocatedPorts { /// Allocate three available ports and register them globally. /// Retries if the OS-assigned ports are already allocated by another test. /// Multiple tests can allocate ports concurrently without blocking each other. - fn new() -> Self { + fn new() -> Result { loop { // Find candidate ports from the OS - let p1 = find_available_port(); - let p2 = find_available_port(); - let p3 = find_available_port(); + let p1 = find_available_port()?; + let p2 = find_available_port()?; + let p3 = find_available_port()?; // `find_available_port()` could return a port value that was previously found out to // be free in the context of a different test, but has not been bound yet. However, it would @@ -91,20 +113,24 @@ impl AllocatedPorts { // that such a port is *really* available. // Try to reserve them in the global registry. - let mut registry = ALLOCATED_PORTS_REGISTRY.lock().unwrap(); - if !registry.contains(&p1) && !registry.contains(&p2) && !registry.contains(&p3) { + let mut registry = ALLOCATED_PORTS_REGISTRY.lock().map_err(|error| { + anyhow::anyhow!("Failed to lock allocated ports registry: {error}") + })?; + let ports_are_available = [p1, p2, p3].iter().all(|port| !registry.contains(port)); + if ports_are_available { registry.insert(p1); registry.insert(p2); registry.insert(p3); + drop(registry); // Drop “early” to satisfy Clippy. - return Self { + return Ok(Self { protocol_port: p1, stderr_port: p2, health_port: p3, _guard: PortGuard { ports: [p1, p2, p3], }, - }; + }); } // If any port is already allocated, try again } @@ -136,33 +162,31 @@ impl TestForwarder { /// Start a new `stdioxide` forwarder with the given command and arguments. /// Automatically allocates unique available ports for this test. /// Retries if port binding fails (e.g., if a port was grabbed between discovery and binding). - fn start(command: &str, args: &[&str]) -> Self { + fn start(command: &str, args: &[&str]) -> Result { const MAX_RETRIES: usize = 3; let mut last_error = None; for attempt in 0..MAX_RETRIES { // Allocate ports--registered in global registry. - let ports = AllocatedPorts::new(); + let ports = AllocatedPorts::new()?; // Try to start with these ports. match Self::try_start_with_ports(command, args, ports) { - Ok(forwarder) => return forwarder, - Err(e) => { - last_error = Some(e); + Ok(forwarder) => return Ok(forwarder), + Err(error) => { + last_error = Some(error); if attempt < MAX_RETRIES - 1 { // Retry with new ports thread::sleep(Duration::from_millis(100)); - continue; } } } } - panic!( - "Failed to start forwarder after {} attempts. Last error: {}", - MAX_RETRIES, - last_error.unwrap() - ); + let last_error = last_error.unwrap_or_else(|| "Unknown error".to_owned()); + Err(anyhow::anyhow!( + "Failed to start forwarder after {MAX_RETRIES} attempts. Last error: {last_error}", + )) } /// Try to start a new `stdioxide` forwarder with pre-allocated ports. @@ -173,14 +197,14 @@ impl TestForwarder { ports: AllocatedPorts, ) -> Result { let process = Self::spawn_process(command, args, &ports) - .map_err(|e| format!("Failed to spawn process: {}", e))?; + .map_err(|error| format!("Failed to spawn process: {error}"))?; let forwarder = Self { process, ports }; // Wait for the forwarder to bind to the ports forwarder .try_wait_for_ready() - .map_err(|e| format!("Failed to become ready: {}", e))?; + .map_err(|error| format!("Failed to become ready: {error}"))?; // Ports remain in the forwarder and will be released when it’s dropped. Ok(forwarder) @@ -188,15 +212,11 @@ impl TestForwarder { /// Internal helper to spawn the forwarder process. /// Returns the spawned Child process or an error if spawning fails (e.g., due to port conflicts). - fn spawn_process( - command: &str, - args: &[&str], - ports: &AllocatedPorts, - ) -> std::io::Result { + fn spawn_process(command: &str, args: &[&str], ports: &AllocatedPorts) -> io::Result { // Get the path to the `stdioxide` binary. // In integration tests, we need to use the binary from the target directory. - let bin_path = std::env::var("CARGO_BIN_EXE_stdioxide") - .unwrap_or_else(|_| "target/debug/stdioxide".to_string()); + let bin_path = env::var("CARGO_BIN_EXE_stdioxide") + .unwrap_or_else(|_| "target/debug/stdioxide".to_owned()); let mut cmd = Command::new(&bin_path); cmd.arg("--protocol-port") @@ -219,7 +239,7 @@ impl TestForwarder { /// Try to wait for the forwarder to be ready by attempting to connect to the health port. /// Returns an error if the forwarder doesn’t become ready in time. - fn try_wait_for_ready(&self) -> Result<(), String> { + fn try_wait_for_ready(&self) -> Result<(), anyhow::Error> { const NUM_ATTEMPTS: usize = 30; let mut last_error = None; @@ -227,12 +247,14 @@ impl TestForwarder { match TcpStream::connect_timeout( &format!("127.0.0.1:{}", self.ports.health_port()) .parse() - .unwrap(), + .map_err(|error| { + anyhow::anyhow!("Failed to parse health port address: {error}") + })?, Duration::from_millis(100), ) { Ok(_) => return Ok(()), - Err(e) => { - last_error = Some(e); + Err(error) => { + last_error = Some(error); if attempt < NUM_ATTEMPTS - 1 { thread::sleep(Duration::from_millis(50)); } @@ -240,36 +262,39 @@ impl TestForwarder { } } - Err(format!( - "Forwarder did not become ready in time on port {}. Last error: {}", + let last_error = + last_error.map_or_else(|| "Unknown error".to_owned(), |error| format!("{error}")); + Err(anyhow::anyhow!( + "Forwarder did not become ready in time on port {}. Last error: {last_error}", self.ports.health_port(), - last_error.unwrap() )) } /// Connect to the protocol port. - fn connect_protocol(&self) -> TcpStream { - self.connect_with_retry(self.ports.protocol_port(), "protocol") + fn connect_protocol(&self) -> Result { + Self::connect_with_retry(self.ports.protocol_port(), "protocol") } /// Connect to the `stderr` port. - fn connect_stderr(&self) -> TcpStream { - self.connect_with_retry(self.ports.stderr_port(), "stderr") + fn connect_stderr(&self) -> Result { + Self::connect_with_retry(self.ports.stderr_port(), "stderr") } /// Connect to the health port. - fn connect_health(&self) -> TcpStream { - self.connect_with_retry(self.ports.health_port(), "health") + fn connect_health(&self) -> Result { + Self::connect_with_retry(self.ports.health_port(), "health") } /// Connect to a port with retries. - fn connect_with_retry(&self, port: u16, label: &str) -> TcpStream { + fn connect_with_retry(port: u16, label: &str) -> Result { const NUM_ATTEMPTS: usize = 20; for attempt in 0..NUM_ATTEMPTS { match TcpStream::connect(("127.0.0.1", port)) { - Ok(stream) => return stream, - Err(e) if attempt == NUM_ATTEMPTS - 1 => { - panic!("Failed to connect to {} port {}: {}", label, port, e); + Ok(stream) => return Ok(stream), + Err(error) if attempt == NUM_ATTEMPTS - 1 => { + return Err(anyhow::anyhow!( + "Failed to connect to {label} port {port} after {NUM_ATTEMPTS} attempts: {error}" + )); } Err(_) => { thread::sleep(Duration::from_millis(50)); @@ -295,36 +320,36 @@ impl TestForwarder { impl Drop for TestForwarder { fn drop(&mut self) { // Clean up: kill the process if it’s still running. - let _ = self.process.kill(); - let _ = self.process.wait(); + drop(self.process.kill()); + drop(self.process.wait()); } } /// Helper function to read from a stream with a timeout. -fn read_with_timeout(stream: &mut TcpStream, buffer: &mut [u8]) -> std::io::Result { +fn read_with_timeout(stream: &mut TcpStream, buffer: &mut [u8]) -> io::Result { stream.set_read_timeout(Some(Duration::from_secs(5)))?; stream.read(buffer) } /// Helper function to read all available data from a stream up to a timeout. -fn read_all_available(stream: &mut TcpStream, timeout: Duration) -> Vec { +fn read_all_available(stream: &mut TcpStream, timeout: Duration) -> Result, anyhow::Error> { let mut result = Vec::new(); - let mut buffer = [0u8; 8192]; + let mut buffer = [0_u8; 8192]; stream .set_read_timeout(Some(timeout)) - .expect("Failed to set read timeout"); + .map_err(|error| anyhow::anyhow!("Failed to set read timeout: {error}"))?; loop { match stream.read(&mut buffer) { Ok(0) => break, - Ok(n) => result.extend_from_slice(&buffer[..n]), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, - Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break, + Ok(n) => result.extend_from_slice(buffer.get(..n).unwrap_or_default()), + Err(error) if error.kind() == io::ErrorKind::WouldBlock => break, + Err(error) if error.kind() == io::ErrorKind::TimedOut => break, Err(_) => break, } } - result + Ok(result) } // ============================================================================ @@ -332,65 +357,68 @@ fn read_all_available(stream: &mut TcpStream, timeout: Duration) -> Vec { // ============================================================================ #[test] -fn test_forwarder_starts_arbitrary_child_process() { +fn test_forwarder_starts_arbitrary_child_process() -> Result<(), anyhow::Error> { // * [x] A standalone forwarder executable can be started that launches an arbitrary child process. let (cmd, args) = sleep_cmd(5); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // If we got here, the forwarder started successfully. // The forwarder should be ready (health port should be accessible). - assert!(forwarder.connect_health().peer_addr().is_ok()); + forwarder.connect_health()?.peer_addr()?; + Ok(()) } #[test] -fn test_forwarder_passes_arguments_unchanged() { +fn test_forwarder_passes_arguments_unchanged() -> Result<(), anyhow::Error> { // * [x] The forwarder passes command-line arguments through to the child process unchanged. // Use a command that outputs arguments and then waits, so we have time to connect. let (cmd, args) = echo_args_cmd(&["-n", "test", "with spaces", "--flag"]); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; - let mut stream = forwarder.connect_protocol(); - let output = read_all_available(&mut stream, Duration::from_millis(500)); + let mut stream = forwarder.connect_protocol()?; + let output = read_all_available(&mut stream, Duration::from_millis(500))?; // `bash` should output all the arguments. let output_str = String::from_utf8_lossy(&output); assert!(output_str.contains("test")); assert!(output_str.contains("with spaces")); assert!(output_str.contains("--flag")); + Ok(()) } #[test] -fn test_child_process_configurable_externally() { +fn test_child_process_configurable_externally() -> Result<(), anyhow::Error> { // * [x] The child process to launch can be configured externally. // Test with different commands to verify external configuration works. let (cmd1, args1) = echo_with_sleep_cmd("first", 5); - let args1_refs: Vec<&str> = args1.iter().map(|s| s.as_str()).collect(); - let forwarder1 = TestForwarder::start(cmd1, &args1_refs); - let mut stream1 = forwarder1.connect_protocol(); - let output1 = read_all_available(&mut stream1, Duration::from_millis(500)); + let args1_refs: Vec<&str> = args1.iter().map(String::as_str).collect(); + let forwarder1 = TestForwarder::start(cmd1, &args1_refs)?; + let mut stream1 = forwarder1.connect_protocol()?; + let output1 = read_all_available(&mut stream1, Duration::from_millis(500))?; assert!(String::from_utf8_lossy(&output1).contains("first")); let (cmd2, args2) = echo_with_sleep_cmd("second", 5); - let args2_refs: Vec<&str> = args2.iter().map(|s| s.as_str()).collect(); - let forwarder2 = TestForwarder::start(cmd2, &args2_refs); - let mut stream2 = forwarder2.connect_protocol(); - let output2 = read_all_available(&mut stream2, Duration::from_millis(500)); + let args2_refs: Vec<&str> = args2.iter().map(String::as_str).collect(); + let forwarder2 = TestForwarder::start(cmd2, &args2_refs)?; + let mut stream2 = forwarder2.connect_protocol()?; + let output2 = read_all_available(&mut stream2, Duration::from_millis(500))?; assert!(String::from_utf8_lossy(&output2).contains("second")); + Ok(()) } #[test] -fn test_forwarder_exits_when_child_exits() { +fn test_forwarder_exits_when_child_exits() -> Result<(), anyhow::Error> { // * [x] When the child process exits for any reason, the forwarder also terminates. // Use a command that runs briefly and then exits. let (cmd, args) = short_lived_cmd("test", 0); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let mut forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let mut forwarder = TestForwarder::start(cmd, &args_refs)?; // Connect to protocol port to ensure we’re monitoring the forwarder. let _stream = forwarder.connect_protocol(); @@ -403,57 +431,63 @@ fn test_forwarder_exits_when_child_exits() { forwarder.wait_for_exit(), "Forwarder should exit when child exits" ); + Ok(()) } #[test] -fn test_forwarder_exposes_three_tcp_ports() { +fn test_forwarder_exposes_three_tcp_ports() -> Result<(), anyhow::Error> { // * [x] The forwarder exposes three TCP ports: // * [x] a **protocol port** // * [x] an **stderr port** // * [x] a **health port** let (cmd, args) = sleep_cmd(10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Verify all three ports are accessible. - // Note: The `connect_*()` methods already `panic!()` if connection fails, so the real - // accessibility check happens *there*. The `.peer_addr().is_ok()` is redundant but - // serves as documentation. + // Note: The `connect_*()` methods already return an `Err` if connection fails (which + // gets propagated), so the real accessibility check happens *there*. The + // `.peer_addr().is_ok()` is redundant but serves as documentation. assert!( - forwarder.connect_protocol().peer_addr().is_ok(), + forwarder.connect_protocol()?.peer_addr().is_ok(), "Protocol port should be accessible" ); assert!( - forwarder.connect_stderr().peer_addr().is_ok(), + forwarder.connect_stderr()?.peer_addr().is_ok(), "Stderr port should be accessible" ); assert!( - forwarder.connect_health().peer_addr().is_ok(), + forwarder.connect_health()?.peer_addr().is_ok(), "Health port should be accessible" ); + Ok(()) } #[test] -fn test_default_port_values() { +fn test_default_port_values() -> Result<(), anyhow::Error> { // * [x] The default port values are: // * [x] `7000` for the protocol port // * [x] `7001` for the stderr port // * [x] `7002` for the health port + const NUM_ATTEMPTS: usize = 30; // Test setup: Ensure default ports are available. // If they’re not, this is an environment issue, not a test failure. - for port in 7000..=7002 { + for port in 7000_u16..=7002_u16 { let error_message = format!( "TEST SETUP FAILED: Default port {port} is not available. This is an environment issue, not a test failure." ); - let _ = TcpListener::bind(format!("127.0.0.1:{port}")).expect(&error_message); + drop( + TcpListener::bind(format!("127.0.0.1:{port}")) + .map_err(|error| anyhow::anyhow!("{error_message}: {error}"))?, + ); // Port is immediately released here. } // Launch `stdioxide` *without* specifying ports to verify it uses the defaults. - let bin_path = std::env::var("CARGO_BIN_EXE_stdioxide") - .unwrap_or_else(|_| "target/debug/stdioxide".to_string()); + let bin_path = + env::var("CARGO_BIN_EXE_stdioxide").unwrap_or_else(|_| "target/debug/stdioxide".to_owned()); let (sleep_command, sleep_args) = sleep_cmd(10); let mut cmd = Command::new(&bin_path); @@ -464,17 +498,15 @@ fn test_default_port_values() { cmd.stderr(Stdio::piped()); cmd.stdout(Stdio::piped()); - let mut process = cmd.spawn().expect("Failed to start stdioxide"); + let mut process = cmd + .spawn() + .map_err(|error| anyhow::anyhow!("Failed to start stdioxide: {error}"))?; // Wait for the forwarder to be ready by connecting to the default health port. let mut connected = false; - const NUM_ATTEMPTS: usize = 30; for _ in 0..NUM_ATTEMPTS { - if TcpStream::connect_timeout( - &"127.0.0.1:7002".parse().unwrap(), - Duration::from_millis(100), - ) - .is_ok() + if TcpStream::connect_timeout(&"127.0.0.1:7002".parse()?, Duration::from_millis(100)) + .is_ok() { connected = true; break; @@ -487,7 +519,7 @@ fn test_default_port_values() { ); // Verify we can connect to all three default ports. - for port in 7000..=7002 { + for port in 7000_u16..=7002_u16 { assert!( TcpStream::connect(format!("127.0.0.1:{port}")).is_ok(), "Should connect to default port {port}" @@ -495,23 +527,25 @@ fn test_default_port_values() { } // Clean up. - let _ = process.kill(); - let _ = process.wait(); + drop(process.kill()); + drop(process.wait()); + Ok(()) } #[test] -fn test_port_override_via_environment_variables() { +fn test_port_override_via_environment_variables() -> Result<(), anyhow::Error> { // * [x] All three port numbers can be overridden via environment variables. + const NUM_ATTEMPTS: usize = 30; // Allocate unique ports to avoid conflicts. - let ports = AllocatedPorts::new(); + let ports = AllocatedPorts::new()?; let custom_protocol = ports.protocol_port(); let custom_stderr = ports.stderr_port(); let custom_health = ports.health_port(); // Launch `stdioxide` with environment variables (NOT command-line args) to test env var override. - let bin_path = std::env::var("CARGO_BIN_EXE_stdioxide") - .unwrap_or_else(|_| "target/debug/stdioxide".to_string()); + let bin_path = + env::var("CARGO_BIN_EXE_stdioxide").unwrap_or_else(|_| "target/debug/stdioxide".to_owned()); let (sleep_command, sleep_args) = sleep_cmd(10); let mut cmd = Command::new(&bin_path); @@ -524,14 +558,15 @@ fn test_port_override_via_environment_variables() { } cmd.stderr(Stdio::piped()).stdout(Stdio::piped()); - let mut process = cmd.spawn().expect("Failed to start stdioxide"); + let mut process = cmd + .spawn() + .map_err(|error| anyhow::anyhow!("Failed to start stdioxide: {error}"))?; // Wait for the forwarder to be ready by connecting to the custom health port. let mut connected = false; - const NUM_ATTEMPTS: usize = 30; for _ in 0..NUM_ATTEMPTS { if TcpStream::connect_timeout( - &format!("127.0.0.1:{}", custom_health).parse().unwrap(), + &format!("127.0.0.1:{custom_health}").parse()?, Duration::from_millis(100), ) .is_ok() @@ -543,8 +578,7 @@ fn test_port_override_via_environment_variables() { } assert!( connected, - "Forwarder should be ready on custom health port {}", - custom_health + "Forwarder should be ready on custom health port {custom_health}", ); // Verify we can connect to all three custom ports. @@ -562,134 +596,150 @@ fn test_port_override_via_environment_variables() { ); // Clean up. - let _ = process.kill(); - let _ = process.wait(); + drop(process.kill()); + drop(process.wait()); + Ok(()) } #[test] -fn test_stdout_sent_over_protocol_port() { +fn test_stdout_sent_over_protocol_port() -> Result<(), anyhow::Error> { // * [x] The forwarder sends the child process’s `stdout` stream over the protocol port. let (cmd, args) = echo_with_sleep_cmd("Hello from stdout", 5); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; - let mut stream = forwarder.connect_protocol(); - let output = read_all_available(&mut stream, Duration::from_millis(500)); + let mut stream = forwarder.connect_protocol()?; + let output = read_all_available(&mut stream, Duration::from_millis(500))?; let output_str = String::from_utf8_lossy(&output); assert!(output_str.contains("Hello from stdout")); + Ok(()) } #[test] -fn test_stdin_received_on_protocol_port() { +fn test_stdin_received_on_protocol_port() -> Result<(), anyhow::Error> { // * [x] The forwarder receives input for the child process’s `stdin` stream on the protocol port. // * [x] Data received on the protocol port is forwarded to the child process’s `stdin` while the connection is active. let (cmd, args) = cat_cmd(); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; - let mut stream = forwarder.connect_protocol(); + let mut stream = forwarder.connect_protocol()?; // Send data to `stdin` via the protocol port. stream .write_all(b"test input\n") - .expect("Failed to write to protocol port"); - stream.flush().expect("Failed to flush"); + .map_err(|error| anyhow::anyhow!("Failed to write to protocol port: {error}"))?; + stream + .flush() + .map_err(|error| anyhow::anyhow!("Failed to flush protocol port: {error}"))?; // Read back the echoed output from `stdout`. // Increased timeout to account for PowerShell startup on Windows - let output = read_all_available(&mut stream, Duration::from_millis(1500)); + let output = read_all_available(&mut stream, Duration::from_millis(1500))?; let output_str = String::from_utf8_lossy(&output); assert!(output_str.contains("test input")); + Ok(()) } #[test] -fn test_stderr_sent_over_stderr_port() { +fn test_stderr_sent_over_stderr_port() -> Result<(), anyhow::Error> { // * [x] The forwarder sends the child process’s `stderr` stream over the `stderr` port. // Use a `bash` command that writes to `stderr` and then waits. let (cmd, args) = stderr_echo_with_sleep_cmd("error message", 5); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); - - let mut stream = forwarder.connect_stderr(); - let output = read_all_available(&mut stream, Duration::from_millis(500)); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; + let mut stream = forwarder.connect_stderr()?; + let output = read_all_available(&mut stream, Duration::from_millis(500))?; let output_str = String::from_utf8_lossy(&output); assert!(output_str.contains("error message")); + Ok(()) } #[test] -fn test_protocol_port_single_client_only() { +fn test_protocol_port_single_client_only() -> Result<(), anyhow::Error> { // * [x] The protocol port allows at most one active client connection at a time. let (cmd, args) = loop_stdin_to_stdout_cmd(); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // First client connects successfully. - let mut stream1 = forwarder.connect_protocol(); + let mut stream1 = forwarder.connect_protocol()?; // Verify first client works. - stream1.write_all(b"test\n").expect("Failed to write"); - stream1.flush().expect("Failed to flush"); + stream1 + .write_all(b"test\n") + .map_err(|error| anyhow::anyhow!("Failed to write to protocol port: {error}"))?; + stream1 + .flush() + .map_err(|error| anyhow::anyhow!("Failed to flush protocol port: {error}"))?; // Increased timeout to account for PowerShell startup and buffering on Windows - let output = read_all_available(&mut stream1, Duration::from_millis(1500)); + let output = read_all_available(&mut stream1, Duration::from_millis(1500))?; assert!(String::from_utf8_lossy(&output).contains("response")); // Second client connection attempt. // The protocol server only calls `accept()` once, so the second connection // succeeds at the TCP level (queued in backlog) but is never accepted/served. let mut stream2 = TcpStream::connect_timeout( - &format!("127.0.0.1:{}", forwarder.ports.protocol_port()) - .parse() - .unwrap(), + &format!("127.0.0.1:{}", forwarder.ports.protocol_port()).parse()?, Duration::from_millis(200), ) - .expect("Second client should connect successfully (TCP handshake completes)"); + .map_err(|error| { + anyhow::anyhow!( + "Second client should connect successfully (TCP handshake completes): {error}" + ) + })?; // The connection is established but never served--reading should timeout. stream2 .set_read_timeout(Some(Duration::from_millis(200))) - .expect("Should set read timeout"); + .map_err(|error| anyhow::anyhow!("Should set read timeout: {error}"))?; - let mut buf = [0u8; 100]; + let mut buf = [0_u8; 100]; let result = stream2.read(&mut buf); - assert!( - result.is_err() - && matches!( - result.as_ref().unwrap_err().kind(), - std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut - ), - "Second client should timeout reading (connection never served by protocol server)" - ); + if let Err(error) = &result { + assert!( + error.kind() == io::ErrorKind::WouldBlock || error.kind() == io::ErrorKind::TimedOut, + "Second client should timeout reading (connection never served by protocol server)" + ); + Ok(()) + } else { + anyhow::bail!( + "Second client should not receive data (connection never served by protocol server)" + ); + } } #[test] -fn test_stderr_port_single_client_only() { +fn test_stderr_port_single_client_only() -> Result<(), anyhow::Error> { // * [x] The `stderr` port allows at most one active client connection at a time. let (cmd, args) = continuous_stderr_loop_cmd(); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // First client connects successfully. - let _stream1 = forwarder.connect_stderr(); + let _stream1 = forwarder.connect_stderr()?; // Second client should connect but be rejected. // According to the `stderr_server` implementation, it rejects additional connections. - let stream2 = forwarder.connect_stderr(); + let stream2 = forwarder.connect_stderr()?; // The second connection is made but immediately closed/rejected. // Try to read--should get no data or connection closed. let output = read_all_available( - &mut stream2.try_clone().unwrap(), + &mut stream2 + .try_clone() + .map_err(|error| anyhow::anyhow!("Failed to clone stream: {error}"))?, Duration::from_millis(500), - ); + )?; // The second client should not receive meaningful data since the first is still active. // In practice, the connection is accepted but dropped, so we should see minimal/no data. @@ -698,31 +748,31 @@ fn test_stderr_port_single_client_only() { output.is_empty() || output.len() < 100, "Second stderr client should not receive full stream data while first client is active" ); + Ok(()) } #[test] -fn test_health_port_multiple_clients() { +fn test_health_port_multiple_clients() -> Result<(), anyhow::Error> { // * [x] The health port allows multiple simultaneous client connections. let (cmd, args) = sleep_cmd(10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Connect multiple clients to the health port. - let _stream1 = forwarder.connect_health(); - let _stream2 = forwarder.connect_health(); - let _stream3 = forwarder.connect_health(); + let stream1 = forwarder.connect_health()?; + let stream2 = forwarder.connect_health()?; + let stream3 = forwarder.connect_health()?; // All connections should succeed. assert!( - _stream1.peer_addr().is_ok() - && _stream2.peer_addr().is_ok() - && _stream3.peer_addr().is_ok() + stream1.peer_addr().is_ok() && stream2.peer_addr().is_ok() && stream3.peer_addr().is_ok() ); + Ok(()) } #[test] -fn test_protocol_port_buffered_stdout_replay() { +fn test_protocol_port_buffered_stdout_replay() -> Result<(), anyhow::Error> { // * [x] When a client connects to the protocol port for the first time, it first receives all buffered `stdout` data // produced before the connection was established. // * [x] After the buffered `stdout` data has been sent, the client continues to receive newly produced `stdout` data @@ -730,31 +780,33 @@ fn test_protocol_port_buffered_stdout_replay() { // Use a script that produces output immediately and then waits. let (cmd, args) = multi_echo_stdout_cmd("buffered output", 1.0, "realtime output", 10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Now connect - buffering ensures we receive output produced before connection. thread::sleep(Duration::from_millis(100)); // Connect and we should receive the buffered output first. - let mut stream = forwarder.connect_protocol(); + let mut stream = forwarder.connect_protocol()?; // Read the output. - let output = read_all_available(&mut stream, Duration::from_secs(3)); + let read_timeout = Duration::from_secs(5); + let output = read_all_available(&mut stream, read_timeout)?; let output_str = String::from_utf8_lossy(&output); // Verify we got both buffered and realtime output. assert!(output_str.contains("buffered output")); assert!(output_str.contains("realtime output")); + Ok(()) } #[test] -fn test_protocol_disconnect_kills_child() { +fn test_protocol_disconnect_kills_child() -> Result<(), anyhow::Error> { // * [x] When a client disconnects from the protocol port, the child process is killed and the forwarder terminates. let (cmd, args) = sleep_cmd(100); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let mut forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let mut forwarder = TestForwarder::start(cmd, &args_refs)?; { let _stream = forwarder.connect_protocol(); @@ -766,10 +818,11 @@ fn test_protocol_disconnect_kills_child() { forwarder.wait_for_exit(), "Forwarder should exit when protocol client disconnects" ); + Ok(()) } #[test] -fn test_stderr_port_buffered_stderr_replay() { +fn test_stderr_port_buffered_stderr_replay() -> Result<(), anyhow::Error> { // * [x] When a client connects to the `stderr` port, it first receives all buffered `stderr` data // produced before the connection was established. // * [x] After the buffered `stderr` data has been sent, the client continues to receive newly produced @@ -777,33 +830,34 @@ fn test_stderr_port_buffered_stderr_replay() { // Use a script that produces `stderr` immediately and then waits. let (cmd, args) = multi_echo_stderr_cmd("buffered error", 1.0, "realtime error", 10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Now connect to `stderr`--buffering ensures we receive output produced before connection. thread::sleep(Duration::from_millis(100)); // Connect and we should receive the buffered output first. - let mut stream = forwarder.connect_stderr(); + let mut stream = forwarder.connect_stderr()?; // Read the output. - let output = read_all_available(&mut stream, Duration::from_secs(2)); + let read_timeout = Duration::from_secs(5); + let output = read_all_available(&mut stream, read_timeout)?; let output_str = String::from_utf8_lossy(&output); // Verify we got both buffered and realtime output. assert!(output_str.contains("buffered error")); assert!(output_str.contains("realtime error")); + Ok(()) } #[test] -fn test_stderr_disconnect_does_not_kill_child() { +fn test_stderr_disconnect_does_not_kill_child() -> Result<(), anyhow::Error> { // * [x] When a client disconnects from the `stderr` port, neither the forwarder nor the child // process terminate because of that. let (cmd, args) = sleep_cmd(10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); - + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; { let _stderr_stream = forwarder.connect_stderr(); // Disconnect by dropping the stream. @@ -814,13 +868,14 @@ fn test_stderr_disconnect_does_not_kill_child() { // Forwarder should still be running--we can connect to health port. assert!( - forwarder.connect_health().peer_addr().is_ok(), + forwarder.connect_health()?.peer_addr().is_ok(), "Forwarder should still be running after stderr disconnect" ); + Ok(()) } #[test] -fn test_stderr_port_reconnect_continues_from_current_state() { +fn test_stderr_port_reconnect_continues_from_current_state() -> Result<(), anyhow::Error> { // * [x] When a client connects to the `stderr` port, it first receives all buffered `stderr` data // produced before the connection was established and after a previous connection was active // (i.e., no logging data is lost). @@ -832,22 +887,22 @@ fn test_stderr_port_reconnect_continues_from_current_state() { // - "while_disconnected" after 3 seconds (buffered while no client connected) // - "during_second_connection" after 5 seconds (sent to second client) let (cmd, args) = complex_stderr_reconnect_cmd(); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Wait to ensure "before_connection" is buffered. thread::sleep(Duration::from_millis(100)); // First connection--connect, read initial data, then disconnect BEFORE "trigger_disconnect". { - let mut stream = forwarder.connect_stderr(); + let mut stream = forwarder.connect_stderr()?; // Read for 800ms to get "before_connection" (immediate) and "during_first_connection" (at t=0.5s) - let output = read_all_available(&mut stream, Duration::from_millis(800)); + let output = read_all_available(&mut stream, Duration::from_millis(800))?; let output_str = String::from_utf8_lossy(&output); assert!(output_str.contains("before_connection")); assert!(output_str.contains("during_first_connection")); // Disconnect now (at ~t=1.0s), before "trigger_disconnect" (at t=1.5s) - let _ = stream.shutdown(std::net::Shutdown::Both); + drop(stream.shutdown(Shutdown::Both)); drop(stream); assert!( !output_str.contains("trigger_disconnect"), @@ -864,91 +919,91 @@ fn test_stderr_port_reconnect_continues_from_current_state() { // Second connection--should receive all buffered data (trigger_disconnect, while_disconnected) // and realtime data (during_second_connection). No logging data must be lost. { - let mut stream = forwarder.connect_stderr(); + let mut stream = forwarder.connect_stderr()?; // Read for 2.5s to get buffered and realtime data - let output = read_all_available(&mut stream, Duration::from_millis(2500)); + let output = read_all_available(&mut stream, Duration::from_millis(2500))?; let output_str = String::from_utf8_lossy(&output); assert!( output_str.contains("trigger_disconnect"), - "Second client should receive 'trigger_disconnect' (buffered during disconnect), got: {}", - output_str + "Second client should receive 'trigger_disconnect' (buffered during disconnect), got: {output_str}", ); assert!( output_str.contains("while_disconnected"), - "Second client should receive 'while_disconnected' (buffered during disconnect), got: {}", - output_str + "Second client should receive 'while_disconnected' (buffered during disconnect), got: {output_str}", ); assert!( output_str.contains("during_second_connection"), - "Second client should receive realtime data, got: {}", - output_str + "Second client should receive realtime data, got: {output_str}", ); } + Ok(()) } #[test] -fn test_output_buffering_prevents_data_loss() { +fn test_output_buffering_prevents_data_loss() -> Result<(), anyhow::Error> { // * [x] Output buffering must prevent loss of `stdout` and `stderr` data when no client is connected yet. // Start a process that produces output immediately. let (cmd, args) = combined_output_cmd("stdout message", "stderr message", 10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Buffering ensures output is captured even if we connect immediately. thread::sleep(Duration::from_millis(100)); // Now connect--we should receive the buffered output. - let mut stdout_stream = forwarder.connect_protocol(); - let stdout_data = read_all_available(&mut stdout_stream, Duration::from_millis(500)); + let mut stdout_stream = forwarder.connect_protocol()?; + let stdout_data = read_all_available(&mut stdout_stream, Duration::from_millis(500))?; let stdout_str = String::from_utf8_lossy(&stdout_data); - let mut stderr_stream = forwarder.connect_stderr(); - let stderr_data = read_all_available(&mut stderr_stream, Duration::from_millis(500)); + let mut stderr_stream = forwarder.connect_stderr()?; + let stderr_data = read_all_available(&mut stderr_stream, Duration::from_millis(500))?; let stderr_str = String::from_utf8_lossy(&stderr_data); assert!(stdout_str.contains("stdout message")); assert!(stderr_str.contains("stderr message")); + Ok(()) } #[test] -fn test_health_port_indicates_readiness() { +fn test_health_port_indicates_readiness() -> Result<(), anyhow::Error> { // * [x] A successful TCP connection to the health port indicates that the forwarder is ready to accept connections and operate normally. let (cmd, args) = sleep_cmd(10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // If we can connect to health port, the forwarder is ready. - let health_stream = forwarder.connect_health(); - assert!(health_stream.peer_addr().is_ok()); + let health_stream = forwarder.connect_health()?; + health_stream.peer_addr()?; // And the other ports should also be accessible. - assert!(forwarder.connect_protocol().peer_addr().is_ok()); - assert!(forwarder.connect_stderr().peer_addr().is_ok()); + forwarder.connect_protocol()?.peer_addr()?; + forwarder.connect_stderr()?.peer_addr()?; + Ok(()) } #[test] -fn test_health_checks_do_not_interfere() { +fn test_health_checks_do_not_interfere() -> Result<(), anyhow::Error> { // * [x] Health checks on the health port must not interfere with the behavior of the protocol port or the `stderr` port. // Use a process that produces high-volume output on both `stdout` and `stderr`. // Output a unique numbered line every 10ms for 3 seconds (300 lines on each stream). let (cmd, args) = numbered_output_loop_cmd(300, 10); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Spawn a thread to continuously perform health checks for 3.5 seconds. let health_port = forwarder.ports.health_port(); let health_check_handle = thread::spawn(move || { - let start = std::time::Instant::now(); - let mut check_count = 0; + let start = Instant::now(); + let mut check_count = 0_usize; while start.elapsed() < Duration::from_millis(3500) { if TcpStream::connect(("127.0.0.1", health_port)).is_ok() { - check_count += 1; + check_count += 1_usize; } thread::sleep(Duration::from_millis(20)); } @@ -956,10 +1011,10 @@ fn test_health_checks_do_not_interfere() { }); // Spawn a thread to read from the protocol port (`stdout`). - let mut protocol_stream = forwarder.connect_protocol(); + let mut protocol_stream = forwarder.connect_protocol()?; let protocol_handle = thread::spawn(move || { let mut all_output = Vec::new(); - let mut buffer = [0u8; 8192]; + let mut buffer = [0_u8; 8192]; protocol_stream .set_read_timeout(Some(Duration::from_secs(4))) .ok(); @@ -967,9 +1022,9 @@ fn test_health_checks_do_not_interfere() { loop { match protocol_stream.read(&mut buffer) { Ok(0) => break, - Ok(n) => all_output.extend_from_slice(&buffer[..n]), - Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Ok(n) => all_output.extend_from_slice(buffer.get(..n).unwrap_or_default()), + Err(error) if error.kind() == io::ErrorKind::TimedOut => break, + Err(error) if error.kind() == io::ErrorKind::WouldBlock => break, Err(_) => break, } } @@ -977,10 +1032,10 @@ fn test_health_checks_do_not_interfere() { }); // Spawn a thread to read from the stderr port (`stderr`). - let mut stderr_stream = forwarder.connect_stderr(); + let mut stderr_stream = forwarder.connect_stderr()?; let stderr_handle = thread::spawn(move || { let mut all_output = Vec::new(); - let mut buffer = [0u8; 8192]; + let mut buffer = [0_u8; 8192]; stderr_stream .set_read_timeout(Some(Duration::from_secs(4))) .ok(); @@ -988,9 +1043,9 @@ fn test_health_checks_do_not_interfere() { loop { match stderr_stream.read(&mut buffer) { Ok(0) => break, - Ok(n) => all_output.extend_from_slice(&buffer[..n]), - Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break, - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Ok(n) => all_output.extend_from_slice(buffer.get(..n).unwrap_or_default()), + Err(error) if error.kind() == io::ErrorKind::TimedOut => break, + Err(error) if error.kind() == io::ErrorKind::WouldBlock => break, Err(_) => break, } } @@ -1000,9 +1055,13 @@ fn test_health_checks_do_not_interfere() { // Wait for all threads to complete. let health_check_count = health_check_handle .join() - .expect("Health check thread panicked"); - let protocol_output = protocol_handle.join().expect("Protocol thread panicked"); - let stderr_output = stderr_handle.join().expect("Stderr thread panicked"); + .map_err(|error| anyhow::anyhow!("Health check thread panicked: {error:?}"))?; + let protocol_output = protocol_handle + .join() + .map_err(|error| anyhow::anyhow!("Protocol thread panicked: {error:?}"))?; + let stderr_output = stderr_handle + .join() + .map_err(|error| anyhow::anyhow!("Stderr thread panicked: {error:?}"))?; // Verify that health checks were performed successfully. // Note: The count varies significantly by platform (Linux/Windows: ~150+, macOS: ~35-40) @@ -1010,8 +1069,7 @@ fn test_health_checks_do_not_interfere() { // a reasonable number of health checks occurred without interfering with data transfer. assert!( health_check_count > 20, - "Should have performed multiple health checks (got {})", - health_check_count + "Should have performed multiple health checks (got {health_check_count})", ); // Verify that we received substantial data on both ports despite constant health checks. @@ -1024,14 +1082,12 @@ fn test_health_checks_do_not_interfere() { assert!( protocol_line_count >= 250, - "Should have received most stdout lines despite health checks (got {})", - protocol_line_count + "Should have received most stdout lines despite health checks (got {protocol_line_count})", ); assert!( stderr_line_count >= 250, - "Should have received most stderr lines despite health checks (got {})", - stderr_line_count + "Should have received most stderr lines despite health checks (got {stderr_line_count})", ); // Verify data integrity: check for a few specific lines. @@ -1039,10 +1095,12 @@ fn test_health_checks_do_not_interfere() { assert!(protocol_str.contains("stdout_line_100")); assert!(stderr_str.contains("stderr_line_1")); assert!(stderr_str.contains("stderr_line_100")); + + Ok(()) } #[test] -fn test_works_with_various_executables() { +fn test_works_with_various_executables() -> Result<(), anyhow::Error> { // * [x] The forwarder must work not only for Python applications, but for any executable child process. // Test with various common executables. @@ -1050,32 +1108,36 @@ fn test_works_with_various_executables() { // Test with `echo` via shell command. { let (cmd, args) = echo_with_sleep_cmd("test1", 2); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); - let mut stream = forwarder.connect_protocol(); - let output = read_all_available(&mut stream, Duration::from_millis(500)); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; + let mut stream = forwarder.connect_protocol()?; + let output = read_all_available(&mut stream, Duration::from_millis(500))?; assert!(String::from_utf8_lossy(&output).contains("test1")); } // Test with another echo. { let (cmd, args) = echo_with_sleep_cmd("test2", 2); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); - let mut stream = forwarder.connect_protocol(); - let output = read_all_available(&mut stream, Duration::from_millis(500)); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; + let mut stream = forwarder.connect_protocol()?; + let output = read_all_available(&mut stream, Duration::from_millis(500))?; assert!(String::from_utf8_lossy(&output).contains("test2")); } // Test with `cat` (interactive). { let (cmd, args) = cat_cmd(); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); - let mut stream = forwarder.connect_protocol(); - stream.write_all(b"test3\n").expect("Failed to write"); - stream.flush().expect("Failed to flush"); - let output = read_all_available(&mut stream, Duration::from_millis(500)); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; + let mut stream = forwarder.connect_protocol()?; + stream + .write_all(b"test3\n") + .map_err(|error| anyhow::anyhow!("Failed to write to protocol port: {error}"))?; + stream + .flush() + .map_err(|error| anyhow::anyhow!("Failed to flush protocol port: {error}"))?; + let output = read_all_available(&mut stream, Duration::from_millis(500))?; assert!(String::from_utf8_lossy(&output).contains("test3")); } @@ -1086,81 +1148,84 @@ fn test_works_with_various_executables() { let forwarder = TestForwarder::start( python, &["-u", "-c", "import time; print('test4'); time.sleep(2)"], - ); - let mut stream = forwarder.connect_protocol(); - // Delay to ensure Python has started and produced output. - thread::sleep(Duration::from_millis(300)); - // Increased timeout to account for Python interpreter startup (especially on Windows). - let output = read_all_available(&mut stream, Duration::from_millis(3000)); + )?; + let mut stream = forwarder.connect_protocol()?; + + let read_timeout = Duration::from_secs(5); // Python startup (especially slow on Windows). + let output = read_all_available(&mut stream, read_timeout)?; assert!( String::from_utf8_lossy(&output).contains("test4"), "Expected 'test4' in output, got: {:?}", String::from_utf8_lossy(&output) ); } + + Ok(()) } #[test] -fn test_large_output_buffering() { +fn test_large_output_buffering() -> Result<(), anyhow::Error> { // Additional test: verify that large outputs are buffered correctly. // Generate a large output. let large_size = 100_000; let (cmd, args) = generate_large_output_cmd(large_size); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; // Wait for output to be generated. thread::sleep(Duration::from_millis(200)); // Connect and read the buffered output. - let mut stream = forwarder.connect_protocol(); - let mut total_read = 0; - let mut buffer = [0u8; 8192]; + let mut stream = forwarder.connect_protocol()?; + let mut total_read = 0_usize; + let mut buffer = [0_u8; 8192]; while total_read < large_size { match read_with_timeout(&mut stream, &mut buffer) { - Ok(0) => break, + Ok(0) | Err(_) => break, Ok(n) => total_read += n, - Err(_) => break, } } assert!( total_read >= large_size, - "Should have read at least {} bytes, got {}", - large_size, - total_read + "Should have read at least {large_size} bytes, got {total_read}", ); + Ok(()) } #[test] -fn test_concurrent_stdin_stdout_bidirectional() { +fn test_concurrent_stdin_stdout_bidirectional() -> Result<(), anyhow::Error> { // Additional test: verify bidirectional communication works correctly. // Use `cat` which echoes `stdin` to `stdout`. let (cmd, args) = cat_cmd(); - let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); - let forwarder = TestForwarder::start(cmd, &args_refs); - let mut stream = forwarder.connect_protocol(); + let args_refs: Vec<&str> = args.iter().map(String::as_str).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs)?; + let mut stream = forwarder.connect_protocol()?; // Send multiple lines and verify echo. - for i in 0..5 { + for i in 0_usize..5_usize { let message = format!("line {i}\n"); stream .write_all(message.as_bytes()) - .expect("Failed to write"); - stream.flush().expect("Failed to flush"); + .map_err(|error| anyhow::anyhow!("Failed to write to protocol port: {error}"))?; + stream + .flush() + .map_err(|error| anyhow::anyhow!("Failed to flush protocol port: {error}"))?; // Increased timeout for Python startup on Windows. - let output = read_all_available(&mut stream, Duration::from_millis(1500)); + let read_timeout = Duration::from_secs(5); + let output = read_all_available(&mut stream, read_timeout)?; let output_str = String::from_utf8_lossy(&output); assert!( output_str.contains(&format!("line {i}")), - "Expected 'line {i}' in output, got: {:?}", - output_str + "Expected 'line {i}' in output, got: {output_str:?}" ); } + + Ok(()) } // ============================================================================ @@ -1170,27 +1235,35 @@ fn test_concurrent_stdin_stdout_bidirectional() { mod lsp_client; #[test] -fn test_lsp_rust_analyzer_integration() { +fn test_lsp_rust_analyzer_integration() -> Result<(), anyhow::Error> { // Test that stdioxide can successfully tunnel LSP communication with `rust-analyzer`. // This validates the real-world use case of running a language server through the forwarder. - let mut forwarder = TestForwarder::start("rust-analyzer", &[]); - let stream = forwarder.connect_protocol(); + let mut forwarder = TestForwarder::start("rust-analyzer", &[])?; + let stream = forwarder.connect_protocol()?; - let mut lsp = LspClient::new(stream); + let mut lsp = LspClient::new(stream)?; // Initialize the LSP server. - let workspace_path = std::env::current_dir() - .expect("Failed to get current directory") + let workspace_path = env::current_dir() + .map_err(|error| anyhow::anyhow!("Failed to get current directory: {error}"))? .to_string_lossy() .to_string(); - let root_uri = format!("file://{}", workspace_path); + let root_uri = format!("file://{workspace_path}"); - let init_response = lsp.initialize(&root_uri); + let init_response = lsp.initialize(&root_uri)?; - assert_eq!(init_response["jsonrpc"], "2.0"); + assert_eq!( + init_response + .get("jsonrpc") + .ok_or_else(|| anyhow::anyhow!("Expected 'jsonrpc' field in initialize response"))?, + "2.0" + ); assert!( - init_response["result"]["capabilities"].is_object(), + init_response + .get("result") + .and_then(|result| result.get("capabilities")) + .is_some_and(serde_json::Value::is_object), "Should receive server capabilities" ); @@ -1200,20 +1273,28 @@ fn test_lsp_rust_analyzer_integration() { // Open a document (src/main.rs). let main_rs_path = format!("{workspace_path}/src/main.rs"); let main_rs_uri = format!("file://{main_rs_path}"); - let main_rs_content = - std::fs::read_to_string(&main_rs_path).expect("Failed to read src/main.rs"); + let main_rs_content = read_to_string(&main_rs_path) + .map_err(|error| anyhow::anyhow!("Failed to read src/main.rs: {error}"))?; - lsp.did_open(&main_rs_uri, "rust", main_rs_content); + lsp.did_open(&main_rs_uri, "rust", &main_rs_content); // Request document symbols. - let symbols_response = lsp.document_symbol(&main_rs_uri); - - assert_eq!(symbols_response["jsonrpc"], "2.0"); + let symbols_response = lsp.document_symbol(&main_rs_uri)?; + + assert_eq!( + symbols_response + .get("jsonrpc") + .ok_or_else(|| anyhow::anyhow!( + "Expected 'jsonrpc' field in document symbol response" + ))?, + "2.0" + ); // Verify we got some symbols (src/main.rs should have at least the main function). - let symbols = symbols_response["result"] - .as_array() - .expect("Expected array of symbols"); + let symbols = symbols_response + .get("result") + .and_then(|result| result.as_array()) + .ok_or_else(|| anyhow::anyhow!("Expected array of symbols"))?; assert!( !symbols.is_empty(), @@ -1228,8 +1309,13 @@ fn test_lsp_rust_analyzer_integration() { ); // Shutdown the LSP server. - let shutdown_response = lsp.shutdown(); - assert_eq!(shutdown_response["result"], serde_json::Value::Null); + let shutdown_response = lsp.shutdown()?; + assert_eq!( + shutdown_response + .get("result") + .ok_or_else(|| anyhow::anyhow!("Expected 'result' field in shutdown response"))?, + &serde_json::Value::Null + ); // Exit notification is sent automatically when lsp is dropped. drop(lsp); @@ -1240,4 +1326,5 @@ fn test_lsp_rust_analyzer_integration() { forwarder.wait_for_exit(), "Forwarder should exit after LSP client disconnects from protocol port" ); + Ok(()) } diff --git a/tests/lsp_client.rs b/tests/lsp_client.rs index 6a81b3a..24beede 100644 --- a/tests/lsp_client.rs +++ b/tests/lsp_client.rs @@ -1,10 +1,26 @@ +//! Test utilities for LSP client communication. + +#![allow( + unreachable_pub, + reason = "Private test module, but items need `pub` for parent access" +)] + use std::{ - io::{Read, Write}, + io::{self, Read, Write}, net::TcpStream, thread, time::Duration, }; +// Acknowledge available dev-dependencies not used in this test file. +use anyhow as _; +use clap as _; +use lsp_types as _; +use stdioxide as _; +use subprocess as _; +use tracing as _; +use tracing_subscriber as _; + /// RAII wrapper for LSP communication over a TCP stream. /// Automatically sends the exit notification when dropped. pub struct LspClient { @@ -14,24 +30,28 @@ pub struct LspClient { impl LspClient { /// Create a new LSP client from a TCP stream. - pub fn new(stream: TcpStream) -> Self { + /// + /// # Errors + /// + /// Returns an error if setting read or write timeouts on the stream fails. + pub fn new(stream: TcpStream) -> Result { // Set reasonable timeouts for LSP communication. stream .set_read_timeout(Some(Duration::from_secs(10))) - .expect("Failed to set read timeout"); + .map_err(|error| anyhow::anyhow!("Failed to set read timeout: {error}"))?; stream .set_write_timeout(Some(Duration::from_secs(5))) - .expect("Failed to set write timeout"); + .map_err(|error| anyhow::anyhow!("Failed to set write timeout: {error}"))?; - Self { + Ok(Self { stream, next_request_id: 1, - } + }) } /// Send an LSP message over the stream. /// LSP uses JSON-RPC 2.0 with a Content-Length header. - fn send_message(&mut self, message: &serde_json::Value) -> std::io::Result<()> { + fn send_message(&mut self, message: &serde_json::Value) -> io::Result<()> { let json_str = serde_json::to_string(message)?; let content = format!("Content-Length: {}\r\n\r\n{}", json_str.len(), json_str); self.stream.write_all(content.as_bytes())?; @@ -41,24 +61,21 @@ impl LspClient { /// Read an LSP message from the stream. /// Returns the parsed JSON value. - fn read_message(&mut self) -> std::io::Result { + fn read_message(&mut self) -> anyhow::Result { // Read the Content-Length header. let mut header = String::new(); - let mut buffer = [0u8; 1]; + let mut buffer = [0_u8; 1]; // Read until we find "\r\n\r\n" loop { self.stream.read_exact(&mut buffer)?; - header.push(buffer[0] as char); + header.push(buffer[0].into()); if header.ends_with("\r\n\r\n") { break; } // Prevent infinite loops on malformed headers if header.len() > 1000 { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "Header too long", - )); + return Err(anyhow::anyhow!("Header too long")); } } @@ -68,12 +85,10 @@ impl LspClient { .find(|line| line.starts_with("Content-Length:")) .and_then(|line| line.strip_prefix("Content-Length:")) .and_then(|len_str| len_str.trim().parse::().ok()) - .ok_or_else(|| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing Content-Length") - })?; + .ok_or_else(|| anyhow::anyhow!("Missing Content-Length"))?; // Read the JSON content. - let mut content = vec![0u8; content_length]; + let mut content = vec![0_u8; content_length]; self.stream.read_exact(&mut content)?; // Parse JSON. @@ -82,9 +97,13 @@ impl LspClient { } /// Send an LSP request and return the next request ID to use. - fn send_request(&mut self, method: &str, params: serde_json::Value) -> i32 { + fn send_request( + &mut self, + method: &str, + params: &serde_json::Value, + ) -> Result { let request_id = self.next_request_id; - self.next_request_id += 1; + self.next_request_id = self.next_request_id.wrapping_add(1_i32); let request = serde_json::json!({ "jsonrpc": "2.0", @@ -94,12 +113,16 @@ impl LspClient { }); self.send_message(&request) - .expect("Failed to send LSP request"); - request_id + .map_err(|error| anyhow::anyhow!("Failed to send LSP request: {error}"))?; + Ok(request_id) } /// Send an LSP notification (no response expected). - fn send_notification(&mut self, method: &str, params: serde_json::Value) { + fn send_notification( + &mut self, + method: &str, + params: &serde_json::Value, + ) -> Result<(), anyhow::Error> { let notification = serde_json::json!({ "jsonrpc": "2.0", "method": method, @@ -107,35 +130,45 @@ impl LspClient { }); self.send_message(¬ification) - .expect("Failed to send LSP notification"); + .map_err(|error| anyhow::anyhow!("Failed to send LSP notification: {error}"))?; + Ok(()) } /// Read responses until we get a response with the specified ID. /// Skips notifications that may arrive in between. - fn read_response(&mut self, expected_id: i32) -> serde_json::Value { - for _ in 0..20 { + fn read_response(&mut self, expected_id: i32) -> anyhow::Result { + for _ in 0_usize..20_usize { match self.read_message() { Ok(msg) => { // Check if this is our response. if msg.get("id") == Some(&serde_json::json!(expected_id)) { - return msg; + return Ok(msg); } // Otherwise, it's a notification, keep reading. } - Err(e) if e.kind() == std::io::ErrorKind::TimedOut => { + Err(error) + if error + .downcast_ref::() + .is_some_and(|io_error| io_error.kind() == io::ErrorKind::TimedOut) => + { thread::sleep(Duration::from_millis(100)); - continue; } - Err(e) => { - panic!("Failed to read LSP response: {}", e); + Err(error) => { + return Err(anyhow::anyhow!("Failed to read LSP response: {error}")); } } } - panic!("Did not receive response with id {}", expected_id); + Err(anyhow::anyhow!( + "Did not receive response with id {expected_id}" + )) } /// Initialize the LSP server with the given workspace root. - pub fn initialize(&mut self, root_uri: &str) -> serde_json::Value { + /// + /// # Errors + /// + /// Returns an error if the initialize request fails to send or the response cannot be read. + pub fn initialize(&mut self, root_uri: &str) -> anyhow::Result { let params = serde_json::json!({ "processId": null, "rootUri": root_uri, @@ -148,44 +181,52 @@ impl LspClient { } }); - let request_id = self.send_request("initialize", params); + let request_id = self.send_request("initialize", ¶ms)?; self.read_response(request_id) } /// Send the initialized notification. pub fn initialized(&mut self) { - self.send_notification("initialized", serde_json::json!({})); + drop(self.send_notification("initialized", &serde_json::json!({}))); } /// Open a document. - pub fn did_open(&mut self, uri: &str, language_id: &str, text: String) { + pub fn did_open(&mut self, uri: &str, language_id: &str, text: &str) { let params = serde_json::json!({ "textDocument": { "uri": uri, "languageId": language_id, - "version": 1, + "version": 1_i32, "text": text } }); - self.send_notification("textDocument/didOpen", params); + drop(self.send_notification("textDocument/didOpen", ¶ms)); } /// Request document symbols for a file. - pub fn document_symbol(&mut self, uri: &str) -> serde_json::Value { + /// + /// # Errors + /// + /// Returns an error if the document symbol request fails to send or the response cannot be read. + pub fn document_symbol(&mut self, uri: &str) -> anyhow::Result { let params = serde_json::json!({ "textDocument": { "uri": uri } }); - let request_id = self.send_request("textDocument/documentSymbol", params); + let request_id = self.send_request("textDocument/documentSymbol", ¶ms)?; self.read_response(request_id) } /// Shutdown the LSP server. - pub fn shutdown(&mut self) -> serde_json::Value { - let request_id = self.send_request("shutdown", serde_json::json!(null)); + /// + /// # Errors + /// + /// Returns an error if the shutdown request fails to send or the response cannot be read. + pub fn shutdown(&mut self) -> anyhow::Result { + let request_id = self.send_request("shutdown", &serde_json::json!(null))?; self.read_response(request_id) } } @@ -193,9 +234,9 @@ impl LspClient { impl Drop for LspClient { fn drop(&mut self) { // Automatically send exit notification when the client is dropped. - let _ = self.send_message(&serde_json::json!({ + drop(self.send_message(&serde_json::json!({ "jsonrpc": "2.0", "method": "exit" - })); + }))); } } diff --git a/tests/test_utils.rs b/tests/test_utils.rs index 35eb397..0eca61f 100644 --- a/tests/test_utils.rs +++ b/tests/test_utils.rs @@ -7,91 +7,117 @@ //! The goal is to make integration tests work on all platforms without #[cfg(not(windows))] //! guards scattered throughout the test code. +#![allow( + unreachable_pub, + reason = "Private test module, but items need `pub` for parent access" +)] + +// Acknowledge available dev-dependencies not used in this test file. +use anyhow as _; +use clap as _; +use lsp_types as _; +use serde_json as _; +use stdioxide as _; +use subprocess as _; +use tracing as _; +use tracing_subscriber as _; + +/// Returns a command that sleeps for the specified number of seconds. #[cfg(windows)] +#[must_use] pub fn sleep_cmd(seconds: u32) -> (&'static str, Vec) { // Use ping as a sleep alternative on Windows // Pings localhost N+1 times with 1-second intervals (approximately N seconds total) - let pings = seconds + 1; + let pings = seconds.saturating_add(1); ( "ping", - vec!["-n".to_string(), pings.to_string(), "127.0.0.1".to_string()], + vec!["-n".to_owned(), pings.to_string(), "127.0.0.1".to_owned()], ) } +/// Returns a command that sleeps for the specified number of seconds. #[cfg(not(windows))] +#[must_use] pub fn sleep_cmd(seconds: u32) -> (&'static str, Vec) { ("sleep", vec![seconds.to_string()]) } +/// Returns a command that echoes text to `stdout`, then sleeps. #[cfg(windows)] +#[must_use] pub fn echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { - let pings = seconds + 1; + let pings = seconds.saturating_add(1); ( "cmd", vec![ - "/C".to_string(), - format!("echo {} && ping -n {} 127.0.0.1 >nul", text, pings), + "/C".to_owned(), + format!("echo {text} && ping -n {pings} 127.0.0.1 >nul"), ], ) } +/// Returns a command that echoes text to `stdout`, then sleeps. #[cfg(not(windows))] +#[must_use] pub fn echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { ( "bash", - vec![ - "-c".to_string(), - format!("echo '{}' && sleep {}", text, seconds), - ], + vec!["-c".to_owned(), format!("echo '{text}' && sleep {seconds}")], ) } +/// Returns a command that echoes text to `stderr`, then sleeps. #[cfg(windows)] +#[must_use] pub fn stderr_echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { - let pings = seconds + 1; + let pings = seconds.saturating_add(1); ( "cmd", vec![ - "/C".to_string(), - format!("echo {} 1>&2 && ping -n {} 127.0.0.1 >nul", text, pings), + "/C".to_owned(), + format!("echo {text} 1>&2 && ping -n {pings} 127.0.0.1 >nul"), ], ) } +/// Returns a command that echoes text to `stderr`, then sleeps. #[cfg(not(windows))] +#[must_use] pub fn stderr_echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { ( "bash", vec![ - "-c".to_string(), - format!("echo '{}' >&2 && sleep {}", text, seconds), + "-c".to_owned(), + format!("echo '{text}' >&2 && sleep {seconds}"), ], ) } +/// Returns a command that echoes to `stderr`, sleeps, echoes again, then sleeps more. #[cfg(windows)] +#[must_use] pub fn multi_echo_stderr_cmd( buffered: &str, sleep1: f32, realtime: &str, sleep2: u32, ) -> (&'static str, Vec) { - let sleep1_ms = (sleep1 * 1000.0) as u32; - let pings = sleep2 + 1; + let pings = sleep2.saturating_add(1); ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), format!( - "[Console]::Error.WriteLine('{}'); Start-Sleep -Milliseconds {}; [Console]::Error.WriteLine('{}'); ping -n {} 127.0.0.1 >$null", - buffered, sleep1_ms, realtime, pings + "[Console]::Error.WriteLine('{buffered}'); Start-Sleep -Seconds {sleep1}; [Console]::Error.WriteLine('{realtime}'); ping -n {pings} 127.0.0.1 >$null" ), ], ) } +/// Returns a command that echoes to `stderr`, sleeps, echoes again, then sleeps more. #[cfg(not(windows))] +#[must_use] pub fn multi_echo_stderr_cmd( buffered: &str, sleep1: f32, @@ -101,38 +127,37 @@ pub fn multi_echo_stderr_cmd( ( "bash", vec![ - "-c".to_string(), - format!( - "echo '{}' >&2; sleep {}; echo '{}' >&2; sleep {}", - buffered, sleep1, realtime, sleep2 - ), + "-c".to_owned(), + format!("echo '{buffered}' >&2; sleep {sleep1}; echo '{realtime}' >&2; sleep {sleep2}"), ], ) } +/// Returns a command that echoes to `stdout`, sleeps, echoes again, then sleeps more. #[cfg(windows)] +#[must_use] pub fn multi_echo_stdout_cmd( buffered: &str, sleep1: f32, realtime: &str, sleep2: u32, ) -> (&'static str, Vec) { - let sleep1_ms = (sleep1 * 1000.0) as u32; - let pings = sleep2 + 1; + let pings = sleep2.saturating_add(1); ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), format!( - "Write-Output '{}'; Start-Sleep -Milliseconds {}; Write-Output '{}'; ping -n {} 127.0.0.1 >$null", - buffered, sleep1_ms, realtime, pings + "Write-Output '{buffered}'; Start-Sleep -Seconds {sleep1}; Write-Output '{realtime}'; ping -n {pings} 127.0.0.1 >$null" ), ], ) } +/// Returns a command that echoes to `stdout`, sleeps, echoes again, then sleeps more. #[cfg(not(windows))] +#[must_use] pub fn multi_echo_stdout_cmd( buffered: &str, sleep1: f32, @@ -142,130 +167,152 @@ pub fn multi_echo_stdout_cmd( ( "bash", vec![ - "-c".to_string(), - format!( - "echo '{}'; sleep {}; echo '{}'; sleep {}", - buffered, sleep1, realtime, sleep2 - ), + "-c".to_owned(), + format!("echo '{buffered}'; sleep {sleep1}; echo '{realtime}'; sleep {sleep2}",), ], ) } +/// Returns a command that reads from `stdin` and echoes to `stdout` (like `cat`). #[cfg(windows)] +#[must_use] pub fn cat_cmd() -> (&'static str, Vec) { // Use Python for reliable line-by-line I/O on Windows. // `-u` flag disables buffering for immediate output. ( python_cmd(), vec![ - "-u".to_string(), - "-c".to_string(), - "import sys; [print(line.rstrip()) for line in sys.stdin]".to_string(), + "-u".to_owned(), + "-c".to_owned(), + "import sys; [print(line.rstrip()) for line in sys.stdin]".to_owned(), ], ) } +/// Returns a command that reads from `stdin` and echoes to `stdout` (like `cat`). #[cfg(not(windows))] -pub fn cat_cmd() -> (&'static str, Vec) { +#[must_use] +pub const fn cat_cmd() -> (&'static str, Vec) { ("cat", vec![]) } +/// Returns a command that continuously reads from `stdin` and writes "response" to `stdout`. #[cfg(windows)] +#[must_use] pub fn loop_stdin_to_stdout_cmd() -> (&'static str, Vec) { // PowerShell script that reads line by line and echoes - // Use [Console]::In to read from stdin and [Console]::WriteLine() for immediate flushing + // Use [Console]::In to read from `stdin` and [Console]::WriteLine() for immediate flushing ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), "while($line = [Console]::In.ReadLine()) { [Console]::WriteLine('response') }" - .to_string(), + .to_owned(), ], ) } +/// Returns a command that continuously reads from `stdin` and writes "response" to `stdout`. #[cfg(not(windows))] +#[must_use] pub fn loop_stdin_to_stdout_cmd() -> (&'static str, Vec) { ( "bash", vec![ - "-c".to_string(), - "while true; do read line; echo response; done".to_string(), + "-c".to_owned(), + "while true; do read line; echo response; done".to_owned(), ], ) } +/// Returns a command that continuously writes "error" to `stderr` in a loop. #[cfg(windows)] +#[must_use] pub fn continuous_stderr_loop_cmd() -> (&'static str, Vec) { ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), "while($true) { [Console]::Error.WriteLine('error'); Start-Sleep -Milliseconds 100 }" - .to_string(), + .to_owned(), ], ) } +/// Returns a command that continuously writes "error" to `stderr` in a loop. #[cfg(not(windows))] +#[must_use] pub fn continuous_stderr_loop_cmd() -> (&'static str, Vec) { ( "bash", vec![ - "-c".to_string(), - "while true; do echo error >&2; sleep 0.1; done".to_string(), + "-c".to_owned(), + "while true; do echo error >&2; sleep 0.1; done".to_owned(), ], ) } +/// Returns a command that generates a large block of output (repeated 'A' characters). #[cfg(windows)] +#[must_use] pub fn generate_large_output_cmd(size: usize) -> (&'static str, Vec) { // Generate large output using PowerShell ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), - format!("'A' * {}; ping -n 11 127.0.0.1 >$null", size), + "-NoProfile".to_owned(), + "-Command".to_owned(), + format!("'A' * {size}; ping -n 11 127.0.0.1 >$null"), ], ) } +/// Returns a command that generates a large block of output (repeated 'A' characters). #[cfg(not(windows))] +#[must_use] pub fn generate_large_output_cmd(size: usize) -> (&'static str, Vec) { ( "bash", vec![ - "-c".to_string(), - format!("head -c {} /dev/zero | tr '\\0' 'A'; sleep 10", size), + "-c".to_owned(), + format!("head -c {size} /dev/zero | tr '\\0' 'A'; sleep 10"), ], ) } +/// Returns a command that outputs numbered lines to both `stdout` and `stderr` with delays. #[cfg(windows)] +#[must_use] pub fn numbered_output_loop_cmd(count: u32, interval_ms: u32) -> (&'static str, Vec) { ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), format!( - "1..{} | ForEach-Object {{ Write-Output \"stdout_line_$_\"; [Console]::Error.WriteLine(\"stderr_line_$_\"); Start-Sleep -Milliseconds {} }}", - count, interval_ms + "1..{count} | ForEach-Object {{ Write-Output \"stdout_line_$_\"; [Console]::Error.WriteLine(\"stderr_line_$_\"); Start-Sleep -Milliseconds {interval_ms} }}" ), ], ) } +/// Returns a command that outputs numbered lines to both `stdout` and `stderr` with delays. #[cfg(not(windows))] +#[must_use] pub fn numbered_output_loop_cmd(count: u32, interval_ms: u32) -> (&'static str, Vec) { - let interval_sec = interval_ms as f32 / 1000.0; + #[expect( + clippy::integer_division, + reason = "Intentional conversion of milliseconds to seconds with fractional part" + )] + let seconds = interval_ms / 1000; + let millis = interval_ms % 1000; + let interval_sec = format!("{seconds}.{millis:03}"); ( "bash", vec![ - "-c".to_string(), + "-c".to_owned(), format!( "for i in {{1..{count}}}; do echo \"stdout_line_$i\"; echo \"stderr_line_$i\" >&2; sleep {interval_sec}; done" ), @@ -273,30 +320,34 @@ pub fn numbered_output_loop_cmd(count: u32, interval_ms: u32) -> (&'static str, ) } +/// Returns a command that emits timed `stderr` output for testing reconnection scenarios. #[cfg(windows)] +#[must_use] pub fn complex_stderr_reconnect_cmd() -> (&'static str, Vec) { ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), concat!( "[Console]::Error.WriteLine('before_connection'); Start-Sleep -Milliseconds 500; ", "[Console]::Error.WriteLine('during_first_connection'); Start-Sleep -Milliseconds 1000; ", "[Console]::Error.WriteLine('trigger_disconnect'); Start-Sleep -Milliseconds 1500; ", "[Console]::Error.WriteLine('while_disconnected'); Start-Sleep -Milliseconds 2000; ", "[Console]::Error.WriteLine('during_second_connection'); Start-Sleep -Seconds 10" - ).to_string(), + ).to_owned(), ], ) } +/// Returns a command that emits timed `stderr` output for testing reconnection scenarios. #[cfg(not(windows))] +#[must_use] pub fn complex_stderr_reconnect_cmd() -> (&'static str, Vec) { ( "bash", vec![ - "-c".to_string(), + "-c".to_owned(), concat!( "echo 'before_connection' >&2; sleep 0.5; ", "echo 'during_first_connection' >&2; sleep 1; ", @@ -304,32 +355,35 @@ pub fn complex_stderr_reconnect_cmd() -> (&'static str, Vec) { "echo 'while_disconnected' >&2; sleep 2; ", "echo 'during_second_connection' >&2; sleep 10", ) - .to_string(), + .to_owned(), ], ) } +/// Returns a command that outputs to both `stdout` and `stderr`, then sleeps. #[cfg(windows)] +#[must_use] pub fn combined_output_cmd( stdout_msg: &str, stderr_msg: &str, sleep_sec: u32, ) -> (&'static str, Vec) { - let pings = sleep_sec + 1; + let pings = sleep_sec.saturating_add(1); ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), + "-NoProfile".to_owned(), + "-Command".to_owned(), format!( - "Write-Output '{}'; [Console]::Error.WriteLine('{}'); ping -n {} 127.0.0.1 >$null", - stdout_msg, stderr_msg, pings + "Write-Output '{stdout_msg}'; [Console]::Error.WriteLine('{stderr_msg}'); ping -n {pings} 127.0.0.1 >$null" ), ], ) } +/// Returns a command that outputs to both `stdout` and `stderr`, then sleeps. #[cfg(not(windows))] +#[must_use] pub fn combined_output_cmd( stdout_msg: &str, stderr_msg: &str, @@ -338,69 +392,78 @@ pub fn combined_output_cmd( ( "bash", vec![ - "-c".to_string(), - format!( - "echo '{}'; echo '{}' >&2; sleep {}", - stdout_msg, stderr_msg, sleep_sec - ), + "-c".to_owned(), + format!("echo '{stdout_msg}'; echo '{stderr_msg}' >&2; sleep {sleep_sec}"), ], ) } +/// Returns a command that echoes all provided arguments to `stdout`. #[cfg(windows)] +#[must_use] pub fn echo_args_cmd(args: &[&str]) -> (&'static str, Vec) { - let mut cmd_args = vec!["/C".to_string()]; + let mut cmd_args = vec!["/C".to_owned()]; // Use echo %* to print all arguments on Windows (requires a batch context) // Alternative: build the echo command with all args let echo_str = args.join(" "); - cmd_args.push(format!("echo {} && ping -n 6 127.0.0.1 >nul", echo_str)); + cmd_args.push(format!("echo {echo_str} && ping -n 6 127.0.0.1 >nul")); ("cmd", cmd_args) } +/// Returns a command that echoes all provided arguments to `stdout`. #[cfg(not(windows))] +#[must_use] pub fn echo_args_cmd(args: &[&str]) -> (&'static str, Vec) { let mut script_args = vec![ - "-c".to_string(), - "echo $@ && sleep 5".to_string(), - "--".to_string(), + "-c".to_owned(), + "echo $@ && sleep 5".to_owned(), + "--".to_owned(), ]; - script_args.extend(args.iter().map(|s| s.to_string())); + script_args.extend(args.iter().map(ToString::to_string)); ("bash", script_args) } +/// Returns a command that outputs a message and exits quickly after a brief delay. #[cfg(windows)] +#[must_use] pub fn short_lived_cmd(msg: &str, sleep_ms: u32) -> (&'static str, Vec) { ( "powershell", vec![ - "-NoProfile".to_string(), - "-Command".to_string(), - format!( - "Write-Output '{}'; Start-Sleep -Milliseconds {}", - msg, sleep_ms - ), + "-NoProfile".to_owned(), + "-Command".to_owned(), + format!("Write-Output '{msg}'; Start-Sleep -Milliseconds {sleep_ms}"), ], ) } +/// Returns a command that outputs a message and exits quickly after a brief delay. #[cfg(not(windows))] +#[must_use] pub fn short_lived_cmd(msg: &str, sleep_ms: u32) -> (&'static str, Vec) { - let sleep_sec = sleep_ms as f32 / 1000.0; + #[expect( + clippy::integer_division, + reason = "Intentional conversion of milliseconds to seconds with fractional part" + )] + let seconds = sleep_ms / 1000; + let millis = sleep_ms % 1000; + let sleep_arg = format!("{seconds}.{millis:03}"); ( "bash", - vec![ - "-c".to_string(), - format!("echo {} && sleep {}", msg, sleep_sec), - ], + vec!["-c".to_owned(), format!("echo {msg} && sleep {sleep_arg}")], ) } +/// Returns the platform-specific Python command name. #[cfg(windows)] -pub fn python_cmd() -> &'static str { +#[must_use] +pub const fn python_cmd() -> &'static str { "python" } +/// Returns the platform-specific Python command name. #[cfg(not(windows))] -pub fn python_cmd() -> &'static str { +#[must_use] +pub const fn python_cmd() -> &'static str { "python3" }