diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2341562..ea48ac9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,21 @@ on: workflow_dispatch: jobs: + fmt: + name: Format Check + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v5 + + - name: Install stable Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + + - name: Check formatting + run: cargo fmt --check + rust: name: ${{ matrix.os }} / ${{ matrix.profile }} runs-on: ${{ matrix.os }} @@ -27,7 +42,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: Install stable Rust toolchain uses: dtolnay/rust-toolchain@stable @@ -36,6 +51,11 @@ jobs: - name: Cache Rust artifacts uses: Swatinem/rust-cache@v2 + with: + # Include profile in cache key to separate debug/release caches. + key: ${{ matrix.profile }} + # Save cache even if subsequent steps fail. + save-if: true - name: Show tool versions shell: bash diff --git a/Cargo.lock b/Cargo.lock index 4991fdc..c2cd1a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,603 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "clap" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ddb117e43bbf7dacf0a4190fef4d345b9bad68dfc649cb349e7d17d28428e51" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2ce8604710f6733aa641a2b3731eaa1e8b3d9973d5e3565da11800813f997a9" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "icu_collections" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" +dependencies = [ + "displaydoc", + "potential_utf", + "utf8_iter", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" + +[[package]] +name = "icu_properties" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" + +[[package]] +name = "icu_provider" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "libc" +version = "0.2.185" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" + +[[package]] +name = "litemap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" + +[[package]] +name = "lsp-types" +version = "0.95.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e34d33a8e9b006cd3fc4fe69a921affa097bae4bb65f76271f4644f9a334365" +dependencies = [ + "bitflags", + "serde", + "serde_json", + "serde_repr", + "url", +] + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + +[[package]] +name = "potential_utf" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" +dependencies = [ + "zerovec", +] + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "stdioxide" version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "lsp-types", + "serde_json", + "subprocess", +] + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "subprocess" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5b79b2cfda2d2dd2ef0640364f88f55f0ee895b1fdac29ab010b8c8ba1eb303" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" +dependencies = [ + "displaydoc", + "zerovec", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", + "serde_derive", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "writeable" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" + +[[package]] +name = "yoke" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerofrom" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index db0fd1b..75e5675 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,13 @@ name = "stdioxide" version = "0.1.0" edition = "2024" +license = "MIT" [dependencies] +anyhow = "1.0.102" +clap = { version = "4.6.1", features = ["derive", "env"] } +subprocess = "1.0.3" + +[dev-dependencies] +lsp-types = "0.95" +serde_json = "1.0" diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d8fed16 --- /dev/null +++ b/LICENSE @@ -0,0 +1,8 @@ +Copyright 2026 Ninjaneers GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..2feaeea --- /dev/null +++ b/README.md @@ -0,0 +1,118 @@ +# stdioxide 🚀 + +A TCP forwarder that exposes a child process’s `stdin`, `stdout`, and `stderr` streams over the network. + +## Overview + +stdioxide launches an arbitrary child process and forwards its standard streams over two TCP ports, allowing remote interaction with any command-line application. Output is buffered to prevent data loss when no clients are connected. +In addition, it provides a third TCP port for health checks (e.g., to run it inside Kubernetes). + +## Motivation + +While several tools exist for TCP stream forwarding (such as `socat`, `netcat`, `tcpserver`, and `xinetd`), stdioxide addresses specific requirements for running processes in containerized environments: + +**Health Check Integration**: Unlike general-purpose forwarding tools, stdioxide provides a dedicated health check port that container orchestrators (Kubernetes, Docker Compose) can use for readiness and liveness probes. Tools like `socat` would require additional wrapper scripts to provide this functionality. + +**Lifecycle Coupling**: The forwarder automatically terminates when the child process exits, ensuring proper cleanup in container environments. Traditional tools like `xinetd` or `tcpserver` are designed to spawn processes on-demand but don’t couple their lifecycle to a single long-running child process. This coupling is essential for container orchestrators to correctly detect when the application has terminated. + +**Reconnectable Stderr**: Most TCP forwarding solutions don’t provide separate, reconnectable access to `stderr` with buffering. This is particularly valuable for collecting diagnostic logs from applications that may have intermittent monitoring connections. + +**Buffered Output**: stdioxide buffers both `stdout` and `stderr` to prevent data loss during client disconnections—a common scenario when running in environments with network instability or during rolling updates. + +### Example Use Cases + +- **Language Servers**: Run language servers (e.g., `rust-analyzer`, `pyright`, `typescript-language-server`) as network services. The protocol port provides `stdin`/`stdout` communication via the Language Server Protocol (LSP), the `stderr` port captures diagnostic logs, and the health port enables container orchestrators to monitor the language server’s availability. +- **Legacy CLI Tools**: Expose command-line applications that weren’t designed for network access as TCP services within containerized environments. +- **Batch Processors**: Wrap long-running data processing scripts with health monitoring and reconnectable log streaming. + +## Features ✨ + +- **Universal compatibility**: Works with any executable child process +- **Three TCP ports**: Protocol, `stderr`, and health endpoints +- **Output buffering**: No data loss when clients disconnect and reconnect +- **Configurable ports**: Via command-line arguments or environment variables +- **Automatic cleanup**: Forwarder terminates when child process exits + +## Installation + +```bash +cargo build --release +``` + +The binary will be available at `target/release/stdioxide`. + +## Usage + +```bash +stdioxide [OPTIONS] [ARGS...] +``` + +**Example:** + +```bash +stdioxide --protocol-port 7000 --stderr-port 7001 --health-port 7002 python my_script.py +``` + +### Arguments + +- ``: The child process to launch +- `[ARGS...]`: Arguments passed through unchanged to the child process + +### Options + +- `--protocol-port `: Protocol port (default: 7000) +- `--stderr-port `: Stderr port (default: 7001) +- `--health-port `: Health check port (default: 7002) + +Ports can also be configured via environment variables: + +- `STDIOXIDE_PROTOCOL_PORT` +- `STDIOXIDE_STDERR_PORT` +- `STDIOXIDE_HEALTH_PORT` + +> [!NOTE] +> Note: If both command-line arguments and environment variables are provided, command-line arguments take precedence. + +## Port Behavior 🔌 + +### Protocol Port (default: 7000) + +The bidirectional communication port for `stdin` and `stdout`: + +- **Single client**: Only one active connection at a time +- **Bidirectional**: Receives `stdin` from client, sends `stdout` to client +- **Buffered replay**: New clients receive all buffered `stdout` before real-time data +- **Terminates on disconnect**: Child process is killed when the client disconnects + +### Stderr Port (default: 7001) + +The unidirectional port for `stderr` output: + +- **Single client**: Only one active connection at a time +- **Reconnectable**: New clients can connect after previous ones disconnect +- **Buffered replay**: New clients receive all buffered `stderr`, including data produced while disconnected +- **No termination**: Child process continues running when clients disconnect + +### Health Port (default: 7002) + +The readiness check endpoint: + +- **Immediately closed**: Connections are accepted and immediately closed +- **Readiness indicator**: Successful TCP connection means forwarder is ready +- **Non-interfering**: Health checks don’t affect protocol or `stderr` ports + +## Testing + +Run the full test suite: + +```bash +cargo test +``` + +## Development + +This project was developed with AI assistance using GitHub Copilot and Claude Sonnet 4.5. + +## License + +See LICENSE file for details. diff --git a/src/app.rs b/src/app.rs new file mode 100644 index 0000000..0fd5e09 --- /dev/null +++ b/src/app.rs @@ -0,0 +1,75 @@ +use std::{ + net::TcpListener, + sync::{Arc, mpsc}, + thread::{self, JoinHandle}, +}; + +use crate::{ + args::Args, + child::StartedChild, + control::{ControlMessage, run_child_coordinator}, + output::{NotifyableOutputState, pump_output_to_state}, + servers::{health::health_server, protocol::protocol_server, stderr::stderr_server}, +}; + +pub fn run(args: Args) -> Result<(), anyhow::Error> { + let protocol_listener = TcpListener::bind(("0.0.0.0", args.protocol_port))?; + let stderr_listener = TcpListener::bind(("0.0.0.0", args.stderr_port))?; + let health_listener = TcpListener::bind(("0.0.0.0", args.health_port))?; + + let child = StartedChild::start(&args.command, &args.args)?; + + let stdout_state = Arc::new(NotifyableOutputState::new()); + let stderr_state = Arc::new(NotifyableOutputState::new()); + + let (control_tx, control_rx) = mpsc::channel::(); + + { + let stdout_state = Arc::clone(&stdout_state); + thread::spawn(move || { + let _ = pump_output_to_state(child.stdout, stdout_state, "stdout"); + }); + } + + { + let stderr_state = Arc::clone(&stderr_state); + thread::spawn(move || { + let _ = pump_output_to_state(child.stderr, stderr_state, "stderr"); + }); + } + + { + let stdout_state = Arc::clone(&stdout_state); + let control_tx = control_tx.clone(); + thread::spawn(move || { + let _ = protocol_server(protocol_listener, stdout_state, child.stdin, control_tx); + }); + } + + { + let stderr_state = Arc::clone(&stderr_state); + let control_tx = control_tx.clone(); + thread::spawn(move || { + let _ = stderr_server(stderr_listener, stderr_state, control_tx); + }); + } + + // We drop the `control_tx` object here so that the main thread is no longer an owner of it + // and thus is not taken into account when determining whether the channel is disconnected. + drop(control_tx); + + { + thread::spawn(move || { + let _ = health_server(health_listener); + }); + } + + let coordinator_thread: JoinHandle> = + thread::spawn(move || run_child_coordinator(child.job, control_rx)); + + coordinator_thread + .join() + .expect("Failed to join coordinator thread")?; + + Ok(()) +} diff --git a/src/args.rs b/src/args.rs new file mode 100644 index 0000000..44b1cea --- /dev/null +++ b/src/args.rs @@ -0,0 +1,170 @@ +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +pub struct Args { + /// The port to use for forwarding stdin and stdout. + #[arg(long, env = "STDIOXIDE_PROTOCOL_PORT", default_value_t = 7000)] + pub protocol_port: u16, + + /// The port to use for forwarding stderr. + #[arg(long, env = "STDIOXIDE_STDERR_PORT", default_value_t = 7001)] + pub stderr_port: u16, + + /// The port to use for health checks. + #[arg(long, env = "STDIOXIDE_HEALTH_PORT", default_value_t = 7002)] + pub health_port: u16, + + /// The command to run as a subprocess. + #[arg(required = true)] + pub command: String, + + /// The arguments to pass to the command. + #[arg(trailing_var_arg = true, allow_hyphen_values = true, num_args = 0..)] + pub args: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + // Mutex to serialize tests that modify environment variables. + // This prevents interference between parallel test execution. + static ENV_MUTEX: Mutex<()> = Mutex::new(()); + + /// RAII helper for temporarily setting environment variables in tests. + /// Automatically restores the original state when dropped. + struct EnvVar { + key: String, + original_value: Option, + } + + impl EnvVar { + /// Set an environment variable temporarily. + fn set(key: impl Into, value: impl AsRef) -> Self { + let key = key.into(); + let original_value = std::env::var(&key).ok(); + unsafe { + std::env::set_var(&key, value.as_ref()); + } + Self { + key, + original_value, + } + } + } + + impl Drop for EnvVar { + fn drop(&mut self) { + unsafe { + match &self.original_value { + Some(value) => std::env::set_var(&self.key, value), + None => std::env::remove_var(&self.key), + } + } + } + } + + #[test] + fn test_default_port_values() { + let _lock = ENV_MUTEX.lock().unwrap(); + let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + assert_eq!(args.protocol_port, 7000); + assert_eq!(args.stderr_port, 7001); + assert_eq!(args.health_port, 7002); + } + + #[test] + fn test_custom_port_values_via_args() { + let args = Args::try_parse_from([ + "stdioxide", + "--protocol-port", + "8000", + "--stderr-port", + "8001", + "--health-port", + "8002", + "echo", + ]) + .unwrap(); + assert_eq!(args.protocol_port, 8000); + assert_eq!(args.stderr_port, 8001); + assert_eq!(args.health_port, 8002); + } + + #[test] + fn test_command_and_args() { + let args = Args::try_parse_from(["stdioxide", "python", "-m", "http.server"]).unwrap(); + assert_eq!(args.command, "python"); + assert_eq!(args.args, vec!["-m", "http.server"]); + } + + #[test] + fn test_args_with_hyphen_values() { + let args = Args::try_parse_from(["stdioxide", "myapp", "--flag", "-value"]).unwrap(); + assert_eq!(args.command, "myapp"); + assert_eq!(args.args, vec!["--flag", "-value"]); + } + + #[test] + fn test_empty_args() { + let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + assert_eq!(args.command, "echo"); + assert!(args.args.is_empty()); + } + + #[test] + fn test_missing_command_fails() { + let result = Args::try_parse_from(["stdioxide"]); + assert!(result.is_err()); + } + + #[test] + fn test_env_var_protocol_port() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _env = EnvVar::set("STDIOXIDE_PROTOCOL_PORT", "9000"); + let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + assert_eq!(args.protocol_port, 9000); + } + + #[test] + fn test_env_var_stderr_port() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _env = EnvVar::set("STDIOXIDE_STDERR_PORT", "9001"); + let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + assert_eq!(args.stderr_port, 9001); + } + + #[test] + fn test_env_var_health_port() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _env = EnvVar::set("STDIOXIDE_HEALTH_PORT", "9002"); + let args = Args::try_parse_from(["stdioxide", "echo"]).unwrap(); + assert_eq!(args.health_port, 9002); + } + + #[test] + fn test_cli_args_override_env_vars() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _env1 = EnvVar::set("STDIOXIDE_PROTOCOL_PORT", "9000"); + let _env2 = EnvVar::set("STDIOXIDE_STDERR_PORT", "9001"); + let _env3 = EnvVar::set("STDIOXIDE_HEALTH_PORT", "9002"); + + let args = Args::try_parse_from([ + "stdioxide", + "--protocol-port", + "8000", + "--stderr-port", + "8001", + "--health-port", + "8002", + "echo", + ]) + .unwrap(); + + assert_eq!(args.protocol_port, 8000); + assert_eq!(args.stderr_port, 8001); + assert_eq!(args.health_port, 8002); + } +} diff --git a/src/child.rs b/src/child.rs new file mode 100644 index 0000000..f1170f9 --- /dev/null +++ b/src/child.rs @@ -0,0 +1,40 @@ +use subprocess::{Exec, Job, Redirection}; + +pub struct StartedChild { + pub job: Job, + pub stdin: std::fs::File, + pub stdout: std::fs::File, + pub stderr: std::fs::File, +} + +impl StartedChild { + pub fn start(command: &str, args: &[String]) -> Result { + let mut process = Exec::cmd(command); + for arg in args { + process = process.arg(arg); + } + let mut job = process + .stdin(Redirection::Pipe) + .stdout(Redirection::Pipe) + .stderr(Redirection::Pipe) + .start()?; + let child_stdin = job + .stdin + .take() + .ok_or_else(|| anyhow::anyhow!("Failed to capture child stdin"))?; + let child_stdout = job + .stdout + .take() + .ok_or_else(|| anyhow::anyhow!("Failed to capture child stdout"))?; + let child_stderr = job + .stderr + .take() + .ok_or_else(|| anyhow::anyhow!("Failed to capture child stderr"))?; + Ok(Self { + job, + stdin: child_stdin, + stdout: child_stdout, + stderr: child_stderr, + }) + } +} diff --git a/src/control.rs b/src/control.rs new file mode 100644 index 0000000..fe0d4f9 --- /dev/null +++ b/src/control.rs @@ -0,0 +1,40 @@ +use std::sync::mpsc; + +use subprocess::Job; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ControlMessage { + KillChild, +} + +pub fn run_child_coordinator( + job: Job, + control_rx: mpsc::Receiver, +) -> Result<(), anyhow::Error> { + loop { + if let Some(status) = job.poll() { + eprintln!("Child process exited with status: {status}"); + return Ok(()); + } + + match control_rx.recv_timeout(std::time::Duration::from_millis(100)) { + Ok(ControlMessage::KillChild) => { + let _ = job.kill(); + let status = job.wait()?; + eprintln!("Child process killed; exit status: {status}"); + return Ok(()); + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // Just poll again. + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + // All senders are gone; we terminate the child process and exit. + eprintln!("Control channel disconnected; terminating child process"); + let _ = job.kill(); + let status = job.wait()?; + eprintln!("Child process killed; exit status: {status}"); + return Ok(()); + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5be80d2 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,6 @@ +pub mod app; +pub mod args; +pub mod child; +pub mod control; +pub mod output; +pub mod servers; diff --git a/src/main.rs b/src/main.rs index e7a11a9..8b925df 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,7 @@ -fn main() { - println!("Hello, world!"); +use clap::Parser; +use stdioxide::{app, args::Args}; + +fn main() -> Result<(), anyhow::Error> { + let args = Args::parse(); + app::run(args) } diff --git a/src/output.rs b/src/output.rs new file mode 100644 index 0000000..5aab68b --- /dev/null +++ b/src/output.rs @@ -0,0 +1,215 @@ +use std::{ + io::{Read, Write}, + net::TcpStream, + sync::{ + Arc, Condvar, Mutex, + atomic::{AtomicBool, Ordering}, + mpsc, + }, +}; + +use crate::control::ControlMessage; + +#[derive(Debug, Clone)] +pub enum ServingBehavior { + KillChildOnDisconnect, + DoNotKillChildOnDisconnect(Arc), +} + +pub struct OutputState { + pub buffer: Vec, + pub eof: bool, +} + +pub struct NotifyableOutputState { + pub state: Mutex, + pub condition_variable: Condvar, +} + +impl NotifyableOutputState { + pub fn new() -> Self { + Self::default() + } +} + +impl Default for NotifyableOutputState { + fn default() -> Self { + Self { + state: Mutex::new(OutputState { + buffer: Vec::new(), + eof: false, + }), + condition_variable: Condvar::new(), + } + } +} + +/// Pumps data from the given `source` (either `stdout` or `stderr` of the child process) into the shared `state`. +pub fn pump_output_to_state( + mut source: impl Read, + output_state: Arc, + label: &'static str, +) -> Result<(), anyhow::Error> { + loop { + let mut buffer = [0u8; 8192]; + let num_bytes_read = source.read(&mut buffer)?; + let mut guard = output_state + .state + .lock() + .expect("Failed to lock output state"); + + if num_bytes_read == 0 { + eprintln!("[{label}] EOF reached"); + guard.eof = true; + output_state.condition_variable.notify_all(); + break; + } + + let chunk = &buffer[..num_bytes_read]; + guard.buffer.extend_from_slice(chunk); + output_state.condition_variable.notify_all(); + } + + Ok(()) +} + +pub fn serve_output_on_stream( + mut stream: TcpStream, + output_state: Arc, + control_tx: mpsc::Sender, + serving_behavior: ServingBehavior, + label: &'static str, +) -> Result<(), anyhow::Error> { + loop { + let buffered_data = { + let mut guard = output_state + .state + .lock() + .expect("Failed to lock stdout state"); + + while guard.buffer.is_empty() && !guard.eof { + // Wait until there’s either new output to send or we’ve reached EOF. + guard = output_state + .condition_variable + .wait(guard) + .expect("Failed to wait on condition variable"); + } + + if guard.buffer.is_empty() && guard.eof { + eprintln!( + "[{label}] EOF reached and no buffered output; closing client connection" + ); + return Ok(()); + } + + // Clone the buffered data to avoid holding the lock while writing to the stream, + // which could potentially block for a long time if the client is slow to read. + guard.buffer.clone() + }; + + let mut num_bytes_written = 0; + + while num_bytes_written < buffered_data.len() { + match stream.write(&buffered_data[num_bytes_written..]) { + Ok(0) => { + // Treat as connection no longer writable. + break; + } + Ok(n) => { + num_bytes_written += n; + } + Err(e) if e.kind() == std::io::ErrorKind::Interrupted => { + // Interrupted by a signal, just retry. + continue; + } + Err(_) => { + // Any other error is treated as the connection being no longer writable. + break; + } + } + } + + // Before draining the buffer, check if the connection is still active (for `stderr` reconnect support). + // If the read monitoring thread detected a disconnect, we should NOT drain the buffer to prevent data loss. + if let ServingBehavior::DoNotKillChildOnDisconnect(ref active) = serving_behavior + && !active.load(Ordering::Acquire) + { + eprintln!( + "[{label}] Connection no longer active (detected by monitoring thread); exiting without draining buffer to prevent data loss" + ); + return Ok(()); + } + + let mut guard = output_state + .state + .lock() + .expect("Failed to lock stdout state"); + + // Since we copied the buffer, there may have been new output produced while we were writing to the stream. We + // only remove the number of bytes that we successfully wrote, so that any new output will still be in the buffer + // for the next iteration. + guard.buffer.drain(..num_bytes_written); + + if num_bytes_written < buffered_data.len() { + // Something went wrong while writing to the stream, and we weren’t able to write all the buffered data. + // We treat this as the connection being no longer writable and exit the loop (and potentially kill the + // child process, depending on the serving behavior). + if matches!(serving_behavior, ServingBehavior::KillChildOnDisconnect) { + let _ = control_tx.send(ControlMessage::KillChild); + } + return Ok(()); + } + + if guard.eof && guard.buffer.is_empty() { + eprintln!("[{label}] EOF reached; closing client connection"); + return Ok(()); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_pump_output_to_state_empty_input() { + let state = Arc::new(NotifyableOutputState::new()); + let input = Cursor::new(Vec::::new()); + + let result = pump_output_to_state(input, Arc::clone(&state), "test"); + assert!(result.is_ok()); + + let guard = state.state.lock().unwrap(); + assert!(guard.buffer.is_empty()); + assert!(guard.eof); + } + + #[test] + fn test_pump_output_to_state_single_chunk() { + let state = Arc::new(NotifyableOutputState::new()); + let data = b"Hello, World!"; + let input = Cursor::new(data.to_vec()); + + let result = pump_output_to_state(input, Arc::clone(&state), "test"); + assert!(result.is_ok()); + + let guard = state.state.lock().unwrap(); + assert_eq!(guard.buffer, data); + assert!(guard.eof); + } + + #[test] + fn test_pump_output_to_state_multiple_chunks() { + let state = Arc::new(NotifyableOutputState::new()); + let data = vec![0u8; 16384]; // Larger than buffer size (8192). + let input = Cursor::new(data.clone()); + + let result = pump_output_to_state(input, Arc::clone(&state), "test"); + assert!(result.is_ok()); + + let guard = state.state.lock().unwrap(); + assert_eq!(guard.buffer, data); + assert!(guard.eof); + } +} diff --git a/src/servers/health.rs b/src/servers/health.rs new file mode 100644 index 0000000..3fba5d3 --- /dev/null +++ b/src/servers/health.rs @@ -0,0 +1,18 @@ +use std::net::TcpListener; + +/// Waits for clients to connect on the `health` port, and immediately drops any connections. The existence +/// of a successful connection is used by the client as a health check for whether the process is alive. +pub fn health_server(listener: TcpListener) -> Result<(), anyhow::Error> { + for stream in listener.incoming() { + match stream { + Ok(_stream) => { + // Immediately drop it; successful connect is enough. + } + Err(e) => { + eprintln!("[health] accept failed: {e}"); + } + } + } + + Ok(()) +} diff --git a/src/servers/mod.rs b/src/servers/mod.rs new file mode 100644 index 0000000..732f3e0 --- /dev/null +++ b/src/servers/mod.rs @@ -0,0 +1,3 @@ +pub mod health; +pub mod protocol; +pub mod stderr; diff --git a/src/servers/protocol.rs b/src/servers/protocol.rs new file mode 100644 index 0000000..6e989c2 --- /dev/null +++ b/src/servers/protocol.rs @@ -0,0 +1,89 @@ +use std::{ + io::{Read, Write}, + net::{TcpListener, TcpStream}, + sync::{Arc, mpsc}, + thread, +}; + +use crate::{ + control::ControlMessage, + output::{NotifyableOutputState, ServingBehavior, serve_output_on_stream}, +}; + +fn forward_stream_data_to_child_process( + mut stream: TcpStream, + mut child_stdin: std::fs::File, + control_tx: mpsc::Sender, +) -> Result<(), anyhow::Error> { + let mut read_buffer = [0u8; 8192]; + loop { + let num_bytes_read = match stream.read(&mut read_buffer) { + Ok(0) => { + eprintln!("[protocol] client disconnected; terminating child process"); + let _ = control_tx.send(ControlMessage::KillChild); + return Ok(()); + } + Ok(n) => n, + Err(e) => { + let _ = control_tx.send(ControlMessage::KillChild); + return Err(anyhow::anyhow!("Failed to read from protocol client: {e}")); + } + }; + + if let Err(e) = child_stdin.write_all(&read_buffer[..num_bytes_read]) { + let _ = control_tx.send(ControlMessage::KillChild); + return Err(anyhow::anyhow!("Failed to write to child stdin: {e}")); + } + if let Err(e) = child_stdin.flush() { + let _ = control_tx.send(ControlMessage::KillChild); + return Err(anyhow::anyhow!("Failed to flush child stdin: {e}")); + } + } +} + +/// Waits for the first client to connect on the protocol port, then forwards data between that +/// client and the child process. This function spawns two threads: one for forwarding data from +/// the client to the child process’s `stdin`, and another for forwarding data from the child +/// process’s `stdout` to the client. +pub fn protocol_server( + listener: TcpListener, + stdout_state: Arc, + child_stdin: std::fs::File, + control_tx: mpsc::Sender, +) -> Result<(), anyhow::Error> { + // We only accept a single (i.e., the first) client connection on the protocol port. + // When the client disconnects, we terminate the child process and exit the server. + let (stdin_thread, stdout_thread) = match listener.accept() { + Ok((stream, address)) => { + eprintln!("[protocol] client connected from {address}"); + let cloned_stream = stream.try_clone()?; + let cloned_control_tx = control_tx.clone(); + ( + thread::spawn(move || { + let _ = forward_stream_data_to_child_process( + cloned_stream, + child_stdin, + cloned_control_tx, + ); + }), + thread::spawn(move || { + let _ = serve_output_on_stream( + stream, + Arc::clone(&stdout_state), + control_tx, + ServingBehavior::KillChildOnDisconnect, + "protocol", + ); + }), + ) + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to accept client connection: {e}")); + } + }; + + stdin_thread.join().expect("Failed to join stdin thread"); + stdout_thread.join().expect("Failed to join stdout thread"); + + Ok(()) +} diff --git a/src/servers/stderr.rs b/src/servers/stderr.rs new file mode 100644 index 0000000..379ac61 --- /dev/null +++ b/src/servers/stderr.rs @@ -0,0 +1,121 @@ +use std::{ + io::Read, + net::{TcpListener, TcpStream}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + mpsc, + }, + thread, +}; + +use crate::{ + control::ControlMessage, + output::{NotifyableOutputState, ServingBehavior, serve_output_on_stream}, +}; + +/// Monitors a client connection for disconnection by attempting to read from the socket. +/// Since `stderr` clients should not send data, any readability indicates disconnection (EOF). +/// When disconnection is detected, the atomic flag is cleared to allow new connections. +fn monitor_stderr_client_connection( + mut stream: TcpStream, + has_active_connection: Arc, +) -> Result<(), anyhow::Error> { + let mut read_buffer = [0u8; 1]; + loop { + match stream.read(&mut read_buffer) { + Ok(0) => { + // EOF; client disconnected gracefully. + eprintln!("[stderr] client disconnect detected"); + has_active_connection.store(false, Ordering::Release); + return Ok(()); + } + Ok(_) => { + // Ignore any data sent by client (unexpected but harmless). + } + Err(e) => { + // Error reading; treat as disconnection. + eprintln!("[stderr] read error (client likely disconnected): {e}"); + has_active_connection.store(false, Ordering::Release); + return Err(anyhow::anyhow!("Failed to read from stderr client: {e}")); + } + } + } +} + +/// Waits for clients to connect on the `stderr` port, and serves the child process’s `stderr` output +/// to the first client that connects. If that client disconnects, we wait for the next client to connect +/// and serve the current `stderr` output to them instead, and so on. The function spawns two threads +/// for each client connection: one for monitoring disconnection and one for writing output. +pub fn stderr_server( + listener: TcpListener, + stderr_state: Arc, + control_tx: mpsc::Sender, +) -> Result<(), anyhow::Error> { + // We allow reconnects on the `stderr` port, but only one client at a time. When a client disconnects, + // we simply wait for the next one to connect. + let has_active_connection = Arc::new(AtomicBool::new(false)); + for stream in listener.incoming() { + match stream { + Ok(stream) => { + if has_active_connection + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + // Atomic value has been successfully changed from `false` to `true`. + eprintln!("[stderr] client connected from {}", stream.peer_addr()?); + + let connection_monitoring_stream = match stream.try_clone() { + Ok(s) => s, + Err(e) => { + eprintln!("[stderr] failed to clone stream: {e}"); + has_active_connection.store(false, Ordering::Release); + continue; + } + }; + + let has_active_connection_clone = Arc::clone(&has_active_connection); + let has_active_connection_monitor = Arc::clone(&has_active_connection); + let has_active_connection_write = Arc::clone(&has_active_connection); + let stderr_state = Arc::clone(&stderr_state); + let control_tx = control_tx.clone(); + + // Spawn read monitoring thread to detect disconnection proactively. + thread::spawn(move || { + let _ = monitor_stderr_client_connection( + connection_monitoring_stream, + has_active_connection_monitor, + ); + }); + + // Spawn write thread to serve stderr output. + thread::spawn(move || { + let _ = serve_output_on_stream( + stream, + stderr_state, + control_tx, + ServingBehavior::DoNotKillChildOnDisconnect(Arc::clone( + &has_active_connection_write, + )), + "stderr", + ); + // When the write thread finishes, also clear the connection flag + // (idempotent if the read thread already did this). + has_active_connection_clone.store(false, Ordering::Release); + }); + } else { + // Atomic value was already `true`, so there is already an active connection. + eprintln!( + "[stderr] client connected from {}, but another client is already connected; rejecting connection", + stream.peer_addr()? + ); + } + } + Err(e) => { + eprintln!("[stderr] accept failed: {e}"); + } + } + } + + Ok(()) +} diff --git a/tests/integration_test.rs b/tests/integration_test.rs new file mode 100644 index 0000000..a7a2c75 --- /dev/null +++ b/tests/integration_test.rs @@ -0,0 +1,1243 @@ +use std::{ + collections::HashSet, + io::{Read, Write}, + net::{TcpListener, TcpStream}, + process::{Child, Command, Stdio}, + sync::{LazyLock, Mutex}, + thread, + time::Duration, +}; + +use crate::lsp_client::LspClient; + +mod test_utils; +use test_utils::*; + +// Global registry tracking all currently allocated ports across all tests used so +// that tests can allocate ports in parallel as long as they don’t conflict. +static ALLOCATED_PORTS_REGISTRY: LazyLock>> = + LazyLock::new(|| Mutex::new(HashSet::new())); + +/// Find an available port by binding to port 0 and letting the OS assign one. +/// Returns the port number that was assigned. +/// Never returns default ports (7000, 7001, 7002) to avoid conflicts with `test_default_port_values()`. +fn find_available_port() -> u16 { + loop { + // Note: This function binds to a port to find out whether it’s available. But this exposes a race + // condition if actions happen in the following order: + // 1. `find_available_port()` binds to a port and thus considers it as being available. + // 2. `find_available_port()` drops the listener, making the port available again. + // 3. Another test tries to find an available port. `find_available_port()` might bind to the same + // port again. + // 4. While the port is bound, the first test tries to bind the `TestForwarder` to the same port. + // This will fail because the port is currently blocked by `find_available_port()` running + // in the context of the second test. + // 5. The first test fails even though the second test is about to free that port again. + // To mitigate this, the `TestForwarder::start()` function has a retry mechanism when + // spawning the forwarder, so if it fails to bind to the port, it will try again with a different + // shortly after. + let port = TcpListener::bind("127.0.0.1:0") + .expect("Failed to bind to find available port") + .local_addr() + .expect("Failed to get local address") + .port(); + + // Skip default ports to avoid conflicts with `test_default_port_values()`. + if !(7000..=7002).contains(&port) { + return port; + } + } +} + +/// RAII guard that removes ports from the global registry when dropped. +struct PortGuard { + ports: [u16; 3], +} + +impl Drop for PortGuard { + fn drop(&mut self) { + let mut registry = ALLOCATED_PORTS_REGISTRY.lock().unwrap(); + for port in &self.ports { + registry.remove(port); + } + } +} + +/// RAII guard for allocated ports. +/// Registers ports in the global registry to prevent other tests from allocating them. +/// Ports are automatically released when this struct is dropped. +/// Not `Copy` or `Clone`: ownership ensures exclusivity. +struct AllocatedPorts { + protocol_port: u16, + stderr_port: u16, + health_port: u16, + _guard: PortGuard, +} + +impl AllocatedPorts { + /// Allocate three available ports and register them globally. + /// Retries if the OS-assigned ports are already allocated by another test. + /// Multiple tests can allocate ports concurrently without blocking each other. + fn new() -> Self { + loop { + // Find candidate ports from the OS + let p1 = find_available_port(); + let p2 = find_available_port(); + let p3 = find_available_port(); + + // `find_available_port()` could return a port value that was previously found out to + // be free in the context of a different test, but has not been bound yet. However, it would + // still be in the registry. Therefore, we have to check against the registry to ensure + // that such a port is *really* available. + + // Try to reserve them in the global registry. + let mut registry = ALLOCATED_PORTS_REGISTRY.lock().unwrap(); + if !registry.contains(&p1) && !registry.contains(&p2) && !registry.contains(&p3) { + registry.insert(p1); + registry.insert(p2); + registry.insert(p3); + + return Self { + protocol_port: p1, + stderr_port: p2, + health_port: p3, + _guard: PortGuard { + ports: [p1, p2, p3], + }, + }; + } + // If any port is already allocated, try again + } + } + + fn protocol_port(&self) -> u16 { + self.protocol_port + } + + fn stderr_port(&self) -> u16 { + self.stderr_port + } + + fn health_port(&self) -> u16 { + self.health_port + } +} + +/// Helper struct to manage a `stdioxide` process for testing. +/// Automatically cleans up the process when dropped. +/// Owns the allocated ports to keep them reserved in the global registry +/// for the entire lifetime of the forwarder. +struct TestForwarder { + process: Child, + ports: AllocatedPorts, +} + +impl TestForwarder { + /// Start a new `stdioxide` forwarder with the given command and arguments. + /// Automatically allocates unique available ports for this test. + /// Retries if port binding fails (e.g., if a port was grabbed between discovery and binding). + fn start(command: &str, args: &[&str]) -> Self { + const MAX_RETRIES: usize = 3; + let mut last_error = None; + + for attempt in 0..MAX_RETRIES { + // Allocate ports--registered in global registry. + let ports = AllocatedPorts::new(); + + // Try to start with these ports. + match Self::try_start_with_ports(command, args, ports) { + Ok(forwarder) => return forwarder, + Err(e) => { + last_error = Some(e); + if attempt < MAX_RETRIES - 1 { + // Retry with new ports + thread::sleep(Duration::from_millis(100)); + continue; + } + } + } + } + + panic!( + "Failed to start forwarder after {} attempts. Last error: {}", + MAX_RETRIES, + last_error.unwrap() + ); + } + + /// Try to start a new `stdioxide` forwarder with pre-allocated ports. + /// Returns an error if spawning fails or the forwarder doesn’t become ready. + fn try_start_with_ports( + command: &str, + args: &[&str], + ports: AllocatedPorts, + ) -> Result { + let process = Self::spawn_process(command, args, &ports) + .map_err(|e| format!("Failed to spawn process: {}", e))?; + + let forwarder = Self { process, ports }; + + // Wait for the forwarder to bind to the ports + forwarder + .try_wait_for_ready() + .map_err(|e| format!("Failed to become ready: {}", e))?; + + // Ports remain in the forwarder and will be released when it’s dropped. + Ok(forwarder) + } + + /// Internal helper to spawn the forwarder process. + /// Returns the spawned Child process or an error if spawning fails (e.g., due to port conflicts). + fn spawn_process( + command: &str, + args: &[&str], + ports: &AllocatedPorts, + ) -> std::io::Result { + // Get the path to the `stdioxide` binary. + // In integration tests, we need to use the binary from the target directory. + let bin_path = std::env::var("CARGO_BIN_EXE_stdioxide") + .unwrap_or_else(|_| "target/debug/stdioxide".to_string()); + + let mut cmd = Command::new(&bin_path); + cmd.arg("--protocol-port") + .arg(ports.protocol_port().to_string()) + .arg("--stderr-port") + .arg(ports.stderr_port().to_string()) + .arg("--health-port") + .arg(ports.health_port().to_string()) + .arg(command); + + for arg in args { + cmd.arg(arg); + } + + cmd.stderr(Stdio::piped()); + cmd.stdout(Stdio::piped()); + + cmd.spawn() + } + + /// Try to wait for the forwarder to be ready by attempting to connect to the health port. + /// Returns an error if the forwarder doesn’t become ready in time. + fn try_wait_for_ready(&self) -> Result<(), String> { + const NUM_ATTEMPTS: usize = 30; + let mut last_error = None; + + for attempt in 0..NUM_ATTEMPTS { + match TcpStream::connect_timeout( + &format!("127.0.0.1:{}", self.ports.health_port()) + .parse() + .unwrap(), + Duration::from_millis(100), + ) { + Ok(_) => return Ok(()), + Err(e) => { + last_error = Some(e); + if attempt < NUM_ATTEMPTS - 1 { + thread::sleep(Duration::from_millis(50)); + } + } + } + } + + Err(format!( + "Forwarder did not become ready in time on port {}. Last error: {}", + self.ports.health_port(), + last_error.unwrap() + )) + } + + /// Connect to the protocol port. + fn connect_protocol(&self) -> TcpStream { + self.connect_with_retry(self.ports.protocol_port(), "protocol") + } + + /// Connect to the `stderr` port. + fn connect_stderr(&self) -> TcpStream { + self.connect_with_retry(self.ports.stderr_port(), "stderr") + } + + /// Connect to the health port. + fn connect_health(&self) -> TcpStream { + self.connect_with_retry(self.ports.health_port(), "health") + } + + /// Connect to a port with retries. + fn connect_with_retry(&self, port: u16, label: &str) -> TcpStream { + const NUM_ATTEMPTS: usize = 20; + for attempt in 0..NUM_ATTEMPTS { + match TcpStream::connect(("127.0.0.1", port)) { + Ok(stream) => return stream, + Err(e) if attempt == NUM_ATTEMPTS - 1 => { + panic!("Failed to connect to {} port {}: {}", label, port, e); + } + Err(_) => { + thread::sleep(Duration::from_millis(50)); + } + } + } + unreachable!() + } + + /// Wait for the forwarder process to exit. + fn wait_for_exit(&mut self) -> bool { + const NUM_ATTEMPTS: usize = 50; + for _ in 0..NUM_ATTEMPTS { + if let Ok(Some(_)) = self.process.try_wait() { + return true; + } + thread::sleep(Duration::from_millis(100)); + } + false + } +} + +impl Drop for TestForwarder { + fn drop(&mut self) { + // Clean up: kill the process if it’s still running. + let _ = self.process.kill(); + let _ = self.process.wait(); + } +} + +/// Helper function to read from a stream with a timeout. +fn read_with_timeout(stream: &mut TcpStream, buffer: &mut [u8]) -> std::io::Result { + stream.set_read_timeout(Some(Duration::from_secs(5)))?; + stream.read(buffer) +} + +/// Helper function to read all available data from a stream up to a timeout. +fn read_all_available(stream: &mut TcpStream, timeout: Duration) -> Vec { + let mut result = Vec::new(); + let mut buffer = [0u8; 8192]; + stream + .set_read_timeout(Some(timeout)) + .expect("Failed to set read timeout"); + + loop { + match stream.read(&mut buffer) { + Ok(0) => break, + Ok(n) => result.extend_from_slice(&buffer[..n]), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break, + Err(_) => break, + } + } + + result +} + +// ============================================================================ +// ACCEPTANCE CRITERIA TESTS +// ============================================================================ + +#[test] +fn test_forwarder_starts_arbitrary_child_process() { + // * [x] A standalone forwarder executable can be started that launches an arbitrary child process. + + let (cmd, args) = sleep_cmd(5); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // If we got here, the forwarder started successfully. + // The forwarder should be ready (health port should be accessible). + assert!(forwarder.connect_health().peer_addr().is_ok()); +} + +#[test] +fn test_forwarder_passes_arguments_unchanged() { + // * [x] The forwarder passes command-line arguments through to the child process unchanged. + + // Use a command that outputs arguments and then waits, so we have time to connect. + let (cmd, args) = echo_args_cmd(&["-n", "test", "with spaces", "--flag"]); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + let mut stream = forwarder.connect_protocol(); + let output = read_all_available(&mut stream, Duration::from_millis(500)); + + // `bash` should output all the arguments. + let output_str = String::from_utf8_lossy(&output); + assert!(output_str.contains("test")); + assert!(output_str.contains("with spaces")); + assert!(output_str.contains("--flag")); +} + +#[test] +fn test_child_process_configurable_externally() { + // * [x] The child process to launch can be configured externally. + + // Test with different commands to verify external configuration works. + let (cmd1, args1) = echo_with_sleep_cmd("first", 5); + let args1_refs: Vec<&str> = args1.iter().map(|s| s.as_str()).collect(); + let forwarder1 = TestForwarder::start(cmd1, &args1_refs); + let mut stream1 = forwarder1.connect_protocol(); + let output1 = read_all_available(&mut stream1, Duration::from_millis(500)); + assert!(String::from_utf8_lossy(&output1).contains("first")); + + let (cmd2, args2) = echo_with_sleep_cmd("second", 5); + let args2_refs: Vec<&str> = args2.iter().map(|s| s.as_str()).collect(); + let forwarder2 = TestForwarder::start(cmd2, &args2_refs); + let mut stream2 = forwarder2.connect_protocol(); + let output2 = read_all_available(&mut stream2, Duration::from_millis(500)); + assert!(String::from_utf8_lossy(&output2).contains("second")); +} + +#[test] +fn test_forwarder_exits_when_child_exits() { + // * [x] When the child process exits for any reason, the forwarder also terminates. + + // Use a command that runs briefly and then exits. + let (cmd, args) = short_lived_cmd("test", 0); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let mut forwarder = TestForwarder::start(cmd, &args_refs); + + // Connect to protocol port to ensure we’re monitoring the forwarder. + let _stream = forwarder.connect_protocol(); + + // Wait for the child to exit (should happen after ~0.5s). + thread::sleep(Duration::from_millis(600)); + + // Forwarder should have exited by now. + assert!( + forwarder.wait_for_exit(), + "Forwarder should exit when child exits" + ); +} + +#[test] +fn test_forwarder_exposes_three_tcp_ports() { + // * [x] The forwarder exposes three TCP ports: + // * [x] a **protocol port** + // * [x] an **stderr port** + // * [x] a **health port** + + let (cmd, args) = sleep_cmd(10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Verify all three ports are accessible. + // Note: The `connect_*()` methods already `panic!()` if connection fails, so the real + // accessibility check happens *there*. The `.peer_addr().is_ok()` is redundant but + // serves as documentation. + assert!( + forwarder.connect_protocol().peer_addr().is_ok(), + "Protocol port should be accessible" + ); + assert!( + forwarder.connect_stderr().peer_addr().is_ok(), + "Stderr port should be accessible" + ); + assert!( + forwarder.connect_health().peer_addr().is_ok(), + "Health port should be accessible" + ); +} + +#[test] +fn test_default_port_values() { + // * [x] The default port values are: + // * [x] `7000` for the protocol port + // * [x] `7001` for the stderr port + // * [x] `7002` for the health port + + // Test setup: Ensure default ports are available. + // If they’re not, this is an environment issue, not a test failure. + for port in 7000..=7002 { + let error_message = format!( + "TEST SETUP FAILED: Default port {port} is not available. This is an environment issue, not a test failure." + ); + let _ = TcpListener::bind(format!("127.0.0.1:{port}")).expect(&error_message); + // Port is immediately released here. + } + + // Launch `stdioxide` *without* specifying ports to verify it uses the defaults. + let bin_path = std::env::var("CARGO_BIN_EXE_stdioxide") + .unwrap_or_else(|_| "target/debug/stdioxide".to_string()); + + let (sleep_command, sleep_args) = sleep_cmd(10); + let mut cmd = Command::new(&bin_path); + cmd.arg(sleep_command); + for arg in sleep_args { + cmd.arg(arg); + } + cmd.stderr(Stdio::piped()); + cmd.stdout(Stdio::piped()); + + let mut process = cmd.spawn().expect("Failed to start stdioxide"); + + // Wait for the forwarder to be ready by connecting to the default health port. + let mut connected = false; + const NUM_ATTEMPTS: usize = 30; + for _ in 0..NUM_ATTEMPTS { + if TcpStream::connect_timeout( + &"127.0.0.1:7002".parse().unwrap(), + Duration::from_millis(100), + ) + .is_ok() + { + connected = true; + break; + } + thread::sleep(Duration::from_millis(50)); + } + assert!( + connected, + "Forwarder should be ready on default health port 7002" + ); + + // Verify we can connect to all three default ports. + for port in 7000..=7002 { + assert!( + TcpStream::connect(format!("127.0.0.1:{port}")).is_ok(), + "Should connect to default port {port}" + ); + } + + // Clean up. + let _ = process.kill(); + let _ = process.wait(); +} + +#[test] +fn test_port_override_via_environment_variables() { + // * [x] All three port numbers can be overridden via environment variables. + + // Allocate unique ports to avoid conflicts. + let ports = AllocatedPorts::new(); + let custom_protocol = ports.protocol_port(); + let custom_stderr = ports.stderr_port(); + let custom_health = ports.health_port(); + + // Launch `stdioxide` with environment variables (NOT command-line args) to test env var override. + let bin_path = std::env::var("CARGO_BIN_EXE_stdioxide") + .unwrap_or_else(|_| "target/debug/stdioxide".to_string()); + + let (sleep_command, sleep_args) = sleep_cmd(10); + let mut cmd = Command::new(&bin_path); + cmd.env("STDIOXIDE_PROTOCOL_PORT", custom_protocol.to_string()) + .env("STDIOXIDE_STDERR_PORT", custom_stderr.to_string()) + .env("STDIOXIDE_HEALTH_PORT", custom_health.to_string()) + .arg(sleep_command); + for arg in sleep_args { + cmd.arg(arg); + } + cmd.stderr(Stdio::piped()).stdout(Stdio::piped()); + + let mut process = cmd.spawn().expect("Failed to start stdioxide"); + + // Wait for the forwarder to be ready by connecting to the custom health port. + let mut connected = false; + const NUM_ATTEMPTS: usize = 30; + for _ in 0..NUM_ATTEMPTS { + if TcpStream::connect_timeout( + &format!("127.0.0.1:{}", custom_health).parse().unwrap(), + Duration::from_millis(100), + ) + .is_ok() + { + connected = true; + break; + } + thread::sleep(Duration::from_millis(50)); + } + assert!( + connected, + "Forwarder should be ready on custom health port {}", + custom_health + ); + + // Verify we can connect to all three custom ports. + assert!( + TcpStream::connect(("127.0.0.1", custom_protocol)).is_ok(), + "Should connect to custom protocol port {custom_protocol}", + ); + assert!( + TcpStream::connect(("127.0.0.1", custom_stderr)).is_ok(), + "Should connect to custom stderr port {custom_stderr}", + ); + assert!( + TcpStream::connect(("127.0.0.1", custom_health)).is_ok(), + "Should connect to custom health port {custom_health}", + ); + + // Clean up. + let _ = process.kill(); + let _ = process.wait(); +} + +#[test] +fn test_stdout_sent_over_protocol_port() { + // * [x] The forwarder sends the child process’s `stdout` stream over the protocol port. + + let (cmd, args) = echo_with_sleep_cmd("Hello from stdout", 5); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + let mut stream = forwarder.connect_protocol(); + let output = read_all_available(&mut stream, Duration::from_millis(500)); + let output_str = String::from_utf8_lossy(&output); + + assert!(output_str.contains("Hello from stdout")); +} + +#[test] +fn test_stdin_received_on_protocol_port() { + // * [x] The forwarder receives input for the child process’s `stdin` stream on the protocol port. + // * [x] Data received on the protocol port is forwarded to the child process’s `stdin` while the connection is active. + + let (cmd, args) = cat_cmd(); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + let mut stream = forwarder.connect_protocol(); + + // Send data to `stdin` via the protocol port. + stream + .write_all(b"test input\n") + .expect("Failed to write to protocol port"); + stream.flush().expect("Failed to flush"); + + // Read back the echoed output from `stdout`. + // Increased timeout to account for PowerShell startup on Windows + let output = read_all_available(&mut stream, Duration::from_millis(1500)); + let output_str = String::from_utf8_lossy(&output); + + assert!(output_str.contains("test input")); +} + +#[test] +fn test_stderr_sent_over_stderr_port() { + // * [x] The forwarder sends the child process’s `stderr` stream over the `stderr` port. + + // Use a `bash` command that writes to `stderr` and then waits. + let (cmd, args) = stderr_echo_with_sleep_cmd("error message", 5); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + let mut stream = forwarder.connect_stderr(); + let output = read_all_available(&mut stream, Duration::from_millis(500)); + let output_str = String::from_utf8_lossy(&output); + + assert!(output_str.contains("error message")); +} + +#[test] +fn test_protocol_port_single_client_only() { + // * [x] The protocol port allows at most one active client connection at a time. + + let (cmd, args) = loop_stdin_to_stdout_cmd(); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // First client connects successfully. + let mut stream1 = forwarder.connect_protocol(); + + // Verify first client works. + stream1.write_all(b"test\n").expect("Failed to write"); + stream1.flush().expect("Failed to flush"); + // Increased timeout to account for PowerShell startup and buffering on Windows + let output = read_all_available(&mut stream1, Duration::from_millis(1500)); + assert!(String::from_utf8_lossy(&output).contains("response")); + + // Second client connection attempt. + // The protocol server only calls `accept()` once, so the second connection + // succeeds at the TCP level (queued in backlog) but is never accepted/served. + let mut stream2 = TcpStream::connect_timeout( + &format!("127.0.0.1:{}", forwarder.ports.protocol_port()) + .parse() + .unwrap(), + Duration::from_millis(200), + ) + .expect("Second client should connect successfully (TCP handshake completes)"); + + // The connection is established but never served--reading should timeout. + stream2 + .set_read_timeout(Some(Duration::from_millis(200))) + .expect("Should set read timeout"); + + let mut buf = [0u8; 100]; + let result = stream2.read(&mut buf); + + assert!( + result.is_err() + && matches!( + result.as_ref().unwrap_err().kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ), + "Second client should timeout reading (connection never served by protocol server)" + ); +} + +#[test] +fn test_stderr_port_single_client_only() { + // * [x] The `stderr` port allows at most one active client connection at a time. + + let (cmd, args) = continuous_stderr_loop_cmd(); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // First client connects successfully. + let _stream1 = forwarder.connect_stderr(); + + // Second client should connect but be rejected. + // According to the `stderr_server` implementation, it rejects additional connections. + let stream2 = forwarder.connect_stderr(); + + // The second connection is made but immediately closed/rejected. + // Try to read--should get no data or connection closed. + let output = read_all_available( + &mut stream2.try_clone().unwrap(), + Duration::from_millis(500), + ); + + // The second client should not receive meaningful data since the first is still active. + // In practice, the connection is accepted but dropped, so we should see minimal/no data. + // This test verifies the single-client behavior. + assert!( + output.is_empty() || output.len() < 100, + "Second stderr client should not receive full stream data while first client is active" + ); +} + +#[test] +fn test_health_port_multiple_clients() { + // * [x] The health port allows multiple simultaneous client connections. + + let (cmd, args) = sleep_cmd(10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Connect multiple clients to the health port. + let _stream1 = forwarder.connect_health(); + let _stream2 = forwarder.connect_health(); + let _stream3 = forwarder.connect_health(); + + // All connections should succeed. + assert!( + _stream1.peer_addr().is_ok() + && _stream2.peer_addr().is_ok() + && _stream3.peer_addr().is_ok() + ); +} + +#[test] +fn test_protocol_port_buffered_stdout_replay() { + // * [x] When a client connects to the protocol port for the first time, it first receives all buffered `stdout` data + // produced before the connection was established. + // * [x] After the buffered `stdout` data has been sent, the client continues to receive newly produced `stdout` data + // in real time. + + // Use a script that produces output immediately and then waits. + let (cmd, args) = multi_echo_stdout_cmd("buffered output", 1.0, "realtime output", 10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Now connect - buffering ensures we receive output produced before connection. + thread::sleep(Duration::from_millis(100)); + + // Connect and we should receive the buffered output first. + let mut stream = forwarder.connect_protocol(); + + // Read the output. + let output = read_all_available(&mut stream, Duration::from_secs(3)); + let output_str = String::from_utf8_lossy(&output); + + // Verify we got both buffered and realtime output. + assert!(output_str.contains("buffered output")); + assert!(output_str.contains("realtime output")); +} + +#[test] +fn test_protocol_disconnect_kills_child() { + // * [x] When a client disconnects from the protocol port, the child process is killed and the forwarder terminates. + + let (cmd, args) = sleep_cmd(100); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let mut forwarder = TestForwarder::start(cmd, &args_refs); + + { + let _stream = forwarder.connect_protocol(); + // Disconnect by dropping the stream. + } + + // Forwarder should terminate. + assert!( + forwarder.wait_for_exit(), + "Forwarder should exit when protocol client disconnects" + ); +} + +#[test] +fn test_stderr_port_buffered_stderr_replay() { + // * [x] When a client connects to the `stderr` port, it first receives all buffered `stderr` data + // produced before the connection was established. + // * [x] After the buffered `stderr` data has been sent, the client continues to receive newly produced + // `stderr` data in real time. + + // Use a script that produces `stderr` immediately and then waits. + let (cmd, args) = multi_echo_stderr_cmd("buffered error", 1.0, "realtime error", 10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Now connect to `stderr`--buffering ensures we receive output produced before connection. + thread::sleep(Duration::from_millis(100)); + + // Connect and we should receive the buffered output first. + let mut stream = forwarder.connect_stderr(); + + // Read the output. + let output = read_all_available(&mut stream, Duration::from_secs(2)); + let output_str = String::from_utf8_lossy(&output); + + // Verify we got both buffered and realtime output. + assert!(output_str.contains("buffered error")); + assert!(output_str.contains("realtime error")); +} + +#[test] +fn test_stderr_disconnect_does_not_kill_child() { + // * [x] When a client disconnects from the `stderr` port, neither the forwarder nor the child + // process terminate because of that. + + let (cmd, args) = sleep_cmd(10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + { + let _stderr_stream = forwarder.connect_stderr(); + // Disconnect by dropping the stream. + } + + // With proactive disconnect detection, disconnect is detected immediately. + thread::sleep(Duration::from_millis(100)); + + // Forwarder should still be running--we can connect to health port. + assert!( + forwarder.connect_health().peer_addr().is_ok(), + "Forwarder should still be running after stderr disconnect" + ); +} + +#[test] +fn test_stderr_port_reconnect_continues_from_current_state() { + // * [x] When a client connects to the `stderr` port, it first receives all buffered `stderr` data + // produced before the connection was established and after a previous connection was active + // (i.e., no logging data is lost). + + // Use a script that outputs at controlled times: + // - "before_connection" immediately (buffered before any connection) + // - "during_first_connection" after 0.5 seconds (sent to first client) + // - "trigger_disconnect" after 1.5 seconds (triggers disconnect detection) + // - "while_disconnected" after 3 seconds (buffered while no client connected) + // - "during_second_connection" after 5 seconds (sent to second client) + let (cmd, args) = complex_stderr_reconnect_cmd(); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Wait to ensure "before_connection" is buffered. + thread::sleep(Duration::from_millis(100)); + + // First connection--connect, read initial data, then disconnect BEFORE "trigger_disconnect". + { + let mut stream = forwarder.connect_stderr(); + // Read for 800ms to get "before_connection" (immediate) and "during_first_connection" (at t=0.5s) + let output = read_all_available(&mut stream, Duration::from_millis(800)); + let output_str = String::from_utf8_lossy(&output); + assert!(output_str.contains("before_connection")); + assert!(output_str.contains("during_first_connection")); + // Disconnect now (at ~t=1.0s), before "trigger_disconnect" (at t=1.5s) + let _ = stream.shutdown(std::net::Shutdown::Both); + drop(stream); + assert!( + !output_str.contains("trigger_disconnect"), + "Should not receive 'trigger_disconnect' in first connection" + ); + } + + // Now we’re at ~t=1.0s. "trigger_disconnect" will be produced at t=1.5s, which will + // cause the server to try writing to the disconnected client and detect the disconnect. + // With proactive disconnect detection, disconnect is detected quickly. + // Wait for "while_disconnected" to be produced (at t=3s from start). + thread::sleep(Duration::from_millis(2300)); + + // Second connection--should receive all buffered data (trigger_disconnect, while_disconnected) + // and realtime data (during_second_connection). No logging data must be lost. + { + let mut stream = forwarder.connect_stderr(); + // Read for 2.5s to get buffered and realtime data + let output = read_all_available(&mut stream, Duration::from_millis(2500)); + let output_str = String::from_utf8_lossy(&output); + + assert!( + output_str.contains("trigger_disconnect"), + "Second client should receive 'trigger_disconnect' (buffered during disconnect), got: {}", + output_str + ); + + assert!( + output_str.contains("while_disconnected"), + "Second client should receive 'while_disconnected' (buffered during disconnect), got: {}", + output_str + ); + + assert!( + output_str.contains("during_second_connection"), + "Second client should receive realtime data, got: {}", + output_str + ); + } +} + +#[test] +fn test_output_buffering_prevents_data_loss() { + // * [x] Output buffering must prevent loss of `stdout` and `stderr` data when no client is connected yet. + + // Start a process that produces output immediately. + let (cmd, args) = combined_output_cmd("stdout message", "stderr message", 10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Buffering ensures output is captured even if we connect immediately. + thread::sleep(Duration::from_millis(100)); + + // Now connect--we should receive the buffered output. + let mut stdout_stream = forwarder.connect_protocol(); + let stdout_data = read_all_available(&mut stdout_stream, Duration::from_millis(500)); + let stdout_str = String::from_utf8_lossy(&stdout_data); + + let mut stderr_stream = forwarder.connect_stderr(); + let stderr_data = read_all_available(&mut stderr_stream, Duration::from_millis(500)); + let stderr_str = String::from_utf8_lossy(&stderr_data); + + assert!(stdout_str.contains("stdout message")); + assert!(stderr_str.contains("stderr message")); +} + +#[test] +fn test_health_port_indicates_readiness() { + // * [x] A successful TCP connection to the health port indicates that the forwarder is ready to accept connections and operate normally. + + let (cmd, args) = sleep_cmd(10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // If we can connect to health port, the forwarder is ready. + let health_stream = forwarder.connect_health(); + assert!(health_stream.peer_addr().is_ok()); + + // And the other ports should also be accessible. + assert!(forwarder.connect_protocol().peer_addr().is_ok()); + assert!(forwarder.connect_stderr().peer_addr().is_ok()); +} + +#[test] +fn test_health_checks_do_not_interfere() { + // * [x] Health checks on the health port must not interfere with the behavior of the protocol port or the `stderr` port. + + // Use a process that produces high-volume output on both `stdout` and `stderr`. + // Output a unique numbered line every 10ms for 3 seconds (300 lines on each stream). + let (cmd, args) = numbered_output_loop_cmd(300, 10); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Spawn a thread to continuously perform health checks for 3.5 seconds. + let health_port = forwarder.ports.health_port(); + let health_check_handle = thread::spawn(move || { + let start = std::time::Instant::now(); + let mut check_count = 0; + while start.elapsed() < Duration::from_millis(3500) { + if TcpStream::connect(("127.0.0.1", health_port)).is_ok() { + check_count += 1; + } + thread::sleep(Duration::from_millis(20)); + } + check_count + }); + + // Spawn a thread to read from the protocol port (`stdout`). + let mut protocol_stream = forwarder.connect_protocol(); + let protocol_handle = thread::spawn(move || { + let mut all_output = Vec::new(); + let mut buffer = [0u8; 8192]; + protocol_stream + .set_read_timeout(Some(Duration::from_secs(4))) + .ok(); + + loop { + match protocol_stream.read(&mut buffer) { + Ok(0) => break, + Ok(n) => all_output.extend_from_slice(&buffer[..n]), + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(_) => break, + } + } + all_output + }); + + // Spawn a thread to read from the stderr port (`stderr`). + let mut stderr_stream = forwarder.connect_stderr(); + let stderr_handle = thread::spawn(move || { + let mut all_output = Vec::new(); + let mut buffer = [0u8; 8192]; + stderr_stream + .set_read_timeout(Some(Duration::from_secs(4))) + .ok(); + + loop { + match stderr_stream.read(&mut buffer) { + Ok(0) => break, + Ok(n) => all_output.extend_from_slice(&buffer[..n]), + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(_) => break, + } + } + all_output + }); + + // Wait for all threads to complete. + let health_check_count = health_check_handle + .join() + .expect("Health check thread panicked"); + let protocol_output = protocol_handle.join().expect("Protocol thread panicked"); + let stderr_output = stderr_handle.join().expect("Stderr thread panicked"); + + // Verify that health checks were performed successfully. + // Note: The count varies significantly by platform (Linux/Windows: ~150+, macOS: ~35-40) + // due to differences in TCP connection establishment speed. We just verify that + // a reasonable number of health checks occurred without interfering with data transfer. + assert!( + health_check_count > 20, + "Should have performed multiple health checks (got {})", + health_check_count + ); + + // Verify that we received substantial data on both ports despite constant health checks. + let protocol_str = String::from_utf8_lossy(&protocol_output); + let stderr_str = String::from_utf8_lossy(&stderr_output); + + // Should have received most of the lines (allowing for some buffering delays at the end). + let protocol_line_count = protocol_str.matches("stdout_line_").count(); + let stderr_line_count = stderr_str.matches("stderr_line_").count(); + + assert!( + protocol_line_count >= 250, + "Should have received most stdout lines despite health checks (got {})", + protocol_line_count + ); + + assert!( + stderr_line_count >= 250, + "Should have received most stderr lines despite health checks (got {})", + stderr_line_count + ); + + // Verify data integrity: check for a few specific lines. + assert!(protocol_str.contains("stdout_line_1")); + assert!(protocol_str.contains("stdout_line_100")); + assert!(stderr_str.contains("stderr_line_1")); + assert!(stderr_str.contains("stderr_line_100")); +} + +#[test] +fn test_works_with_various_executables() { + // * [x] The forwarder must work not only for Python applications, but for any executable child process. + + // Test with various common executables. + + // Test with `echo` via shell command. + { + let (cmd, args) = echo_with_sleep_cmd("test1", 2); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + let mut stream = forwarder.connect_protocol(); + let output = read_all_available(&mut stream, Duration::from_millis(500)); + assert!(String::from_utf8_lossy(&output).contains("test1")); + } + + // Test with another echo. + { + let (cmd, args) = echo_with_sleep_cmd("test2", 2); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + let mut stream = forwarder.connect_protocol(); + let output = read_all_available(&mut stream, Duration::from_millis(500)); + assert!(String::from_utf8_lossy(&output).contains("test2")); + } + + // Test with `cat` (interactive). + { + let (cmd, args) = cat_cmd(); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + let mut stream = forwarder.connect_protocol(); + stream.write_all(b"test3\n").expect("Failed to write"); + stream.flush().expect("Failed to flush"); + let output = read_all_available(&mut stream, Duration::from_millis(500)); + assert!(String::from_utf8_lossy(&output).contains("test3")); + } + + // Test with a Python script (if available). + // Note: Python startup can be slow on Windows, so we use a longer timeout. + { + let python = python_cmd(); + let forwarder = TestForwarder::start( + python, + &["-u", "-c", "import time; print('test4'); time.sleep(2)"], + ); + let mut stream = forwarder.connect_protocol(); + // Delay to ensure Python has started and produced output. + thread::sleep(Duration::from_millis(300)); + // Increased timeout to account for Python interpreter startup (especially on Windows). + let output = read_all_available(&mut stream, Duration::from_millis(3000)); + assert!( + String::from_utf8_lossy(&output).contains("test4"), + "Expected 'test4' in output, got: {:?}", + String::from_utf8_lossy(&output) + ); + } +} + +#[test] +fn test_large_output_buffering() { + // Additional test: verify that large outputs are buffered correctly. + + // Generate a large output. + let large_size = 100_000; + let (cmd, args) = generate_large_output_cmd(large_size); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + + // Wait for output to be generated. + thread::sleep(Duration::from_millis(200)); + + // Connect and read the buffered output. + let mut stream = forwarder.connect_protocol(); + let mut total_read = 0; + let mut buffer = [0u8; 8192]; + + while total_read < large_size { + match read_with_timeout(&mut stream, &mut buffer) { + Ok(0) => break, + Ok(n) => total_read += n, + Err(_) => break, + } + } + + assert!( + total_read >= large_size, + "Should have read at least {} bytes, got {}", + large_size, + total_read + ); +} + +#[test] +fn test_concurrent_stdin_stdout_bidirectional() { + // Additional test: verify bidirectional communication works correctly. + + // Use `cat` which echoes `stdin` to `stdout`. + let (cmd, args) = cat_cmd(); + let args_refs: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + let forwarder = TestForwarder::start(cmd, &args_refs); + let mut stream = forwarder.connect_protocol(); + + // Send multiple lines and verify echo. + for i in 0..5 { + let message = format!("line {i}\n"); + stream + .write_all(message.as_bytes()) + .expect("Failed to write"); + stream.flush().expect("Failed to flush"); + + // Increased timeout for Python startup on Windows. + let output = read_all_available(&mut stream, Duration::from_millis(1500)); + let output_str = String::from_utf8_lossy(&output); + assert!( + output_str.contains(&format!("line {i}")), + "Expected 'line {i}' in output, got: {:?}", + output_str + ); + } +} + +// ============================================================================ +// LSP INTEGRATION TEST +// ============================================================================ + +mod lsp_client; + +#[test] +fn test_lsp_rust_analyzer_integration() { + // Test that stdioxide can successfully tunnel LSP communication with `rust-analyzer`. + // This validates the real-world use case of running a language server through the forwarder. + + let mut forwarder = TestForwarder::start("rust-analyzer", &[]); + let stream = forwarder.connect_protocol(); + + let mut lsp = LspClient::new(stream); + + // Initialize the LSP server. + let workspace_path = std::env::current_dir() + .expect("Failed to get current directory") + .to_string_lossy() + .to_string(); + let root_uri = format!("file://{}", workspace_path); + + let init_response = lsp.initialize(&root_uri); + + assert_eq!(init_response["jsonrpc"], "2.0"); + assert!( + init_response["result"]["capabilities"].is_object(), + "Should receive server capabilities" + ); + + // Send initialized notification. + lsp.initialized(); + + // Open a document (src/main.rs). + let main_rs_path = format!("{workspace_path}/src/main.rs"); + let main_rs_uri = format!("file://{main_rs_path}"); + let main_rs_content = + std::fs::read_to_string(&main_rs_path).expect("Failed to read src/main.rs"); + + lsp.did_open(&main_rs_uri, "rust", main_rs_content); + + // Request document symbols. + let symbols_response = lsp.document_symbol(&main_rs_uri); + + assert_eq!(symbols_response["jsonrpc"], "2.0"); + + // Verify we got some symbols (src/main.rs should have at least the main function). + let symbols = symbols_response["result"] + .as_array() + .expect("Expected array of symbols"); + + assert!( + !symbols.is_empty(), + "Should have received symbols for src/main.rs" + ); + + // Verify at least one symbol has a name (e.g., "main"). + let has_named_symbol = symbols.iter().any(|sym| sym["name"].is_string()); + assert!( + has_named_symbol, + "Should have at least one named symbol in src/main.rs" + ); + + // Shutdown the LSP server. + let shutdown_response = lsp.shutdown(); + assert_eq!(shutdown_response["result"], serde_json::Value::Null); + + // Exit notification is sent automatically when lsp is dropped. + drop(lsp); + + // Forwarder should terminate after LSP client disconnects from protocol port + // or because the LSP server process exits on shutdown (either reason is okay). + assert!( + forwarder.wait_for_exit(), + "Forwarder should exit after LSP client disconnects from protocol port" + ); +} diff --git a/tests/lsp_client.rs b/tests/lsp_client.rs new file mode 100644 index 0000000..6a81b3a --- /dev/null +++ b/tests/lsp_client.rs @@ -0,0 +1,201 @@ +use std::{ + io::{Read, Write}, + net::TcpStream, + thread, + time::Duration, +}; + +/// RAII wrapper for LSP communication over a TCP stream. +/// Automatically sends the exit notification when dropped. +pub struct LspClient { + stream: TcpStream, + next_request_id: i32, +} + +impl LspClient { + /// Create a new LSP client from a TCP stream. + pub fn new(stream: TcpStream) -> Self { + // Set reasonable timeouts for LSP communication. + stream + .set_read_timeout(Some(Duration::from_secs(10))) + .expect("Failed to set read timeout"); + stream + .set_write_timeout(Some(Duration::from_secs(5))) + .expect("Failed to set write timeout"); + + Self { + stream, + next_request_id: 1, + } + } + + /// Send an LSP message over the stream. + /// LSP uses JSON-RPC 2.0 with a Content-Length header. + fn send_message(&mut self, message: &serde_json::Value) -> std::io::Result<()> { + let json_str = serde_json::to_string(message)?; + let content = format!("Content-Length: {}\r\n\r\n{}", json_str.len(), json_str); + self.stream.write_all(content.as_bytes())?; + self.stream.flush()?; + Ok(()) + } + + /// Read an LSP message from the stream. + /// Returns the parsed JSON value. + fn read_message(&mut self) -> std::io::Result { + // Read the Content-Length header. + let mut header = String::new(); + let mut buffer = [0u8; 1]; + + // Read until we find "\r\n\r\n" + loop { + self.stream.read_exact(&mut buffer)?; + header.push(buffer[0] as char); + if header.ends_with("\r\n\r\n") { + break; + } + // Prevent infinite loops on malformed headers + if header.len() > 1000 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Header too long", + )); + } + } + + // Parse Content-Length + let content_length = header + .lines() + .find(|line| line.starts_with("Content-Length:")) + .and_then(|line| line.strip_prefix("Content-Length:")) + .and_then(|len_str| len_str.trim().parse::().ok()) + .ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "Missing Content-Length") + })?; + + // Read the JSON content. + let mut content = vec![0u8; content_length]; + self.stream.read_exact(&mut content)?; + + // Parse JSON. + let json: serde_json::Value = serde_json::from_slice(&content)?; + Ok(json) + } + + /// Send an LSP request and return the next request ID to use. + fn send_request(&mut self, method: &str, params: serde_json::Value) -> i32 { + let request_id = self.next_request_id; + self.next_request_id += 1; + + let request = serde_json::json!({ + "jsonrpc": "2.0", + "id": request_id, + "method": method, + "params": params + }); + + self.send_message(&request) + .expect("Failed to send LSP request"); + request_id + } + + /// Send an LSP notification (no response expected). + fn send_notification(&mut self, method: &str, params: serde_json::Value) { + let notification = serde_json::json!({ + "jsonrpc": "2.0", + "method": method, + "params": params + }); + + self.send_message(¬ification) + .expect("Failed to send LSP notification"); + } + + /// Read responses until we get a response with the specified ID. + /// Skips notifications that may arrive in between. + fn read_response(&mut self, expected_id: i32) -> serde_json::Value { + for _ in 0..20 { + match self.read_message() { + Ok(msg) => { + // Check if this is our response. + if msg.get("id") == Some(&serde_json::json!(expected_id)) { + return msg; + } + // Otherwise, it's a notification, keep reading. + } + Err(e) if e.kind() == std::io::ErrorKind::TimedOut => { + thread::sleep(Duration::from_millis(100)); + continue; + } + Err(e) => { + panic!("Failed to read LSP response: {}", e); + } + } + } + panic!("Did not receive response with id {}", expected_id); + } + + /// Initialize the LSP server with the given workspace root. + pub fn initialize(&mut self, root_uri: &str) -> serde_json::Value { + let params = serde_json::json!({ + "processId": null, + "rootUri": root_uri, + "capabilities": { + "textDocument": { + "hover": { + "contentFormat": ["plaintext", "markdown"] + } + } + } + }); + + let request_id = self.send_request("initialize", params); + self.read_response(request_id) + } + + /// Send the initialized notification. + pub fn initialized(&mut self) { + self.send_notification("initialized", serde_json::json!({})); + } + + /// Open a document. + pub fn did_open(&mut self, uri: &str, language_id: &str, text: String) { + let params = serde_json::json!({ + "textDocument": { + "uri": uri, + "languageId": language_id, + "version": 1, + "text": text + } + }); + + self.send_notification("textDocument/didOpen", params); + } + + /// Request document symbols for a file. + pub fn document_symbol(&mut self, uri: &str) -> serde_json::Value { + let params = serde_json::json!({ + "textDocument": { + "uri": uri + } + }); + + let request_id = self.send_request("textDocument/documentSymbol", params); + self.read_response(request_id) + } + + /// Shutdown the LSP server. + pub fn shutdown(&mut self) -> serde_json::Value { + let request_id = self.send_request("shutdown", serde_json::json!(null)); + self.read_response(request_id) + } +} + +impl Drop for LspClient { + fn drop(&mut self) { + // Automatically send exit notification when the client is dropped. + let _ = self.send_message(&serde_json::json!({ + "jsonrpc": "2.0", + "method": "exit" + })); + } +} diff --git a/tests/test_utils.rs b/tests/test_utils.rs new file mode 100644 index 0000000..35eb397 --- /dev/null +++ b/tests/test_utils.rs @@ -0,0 +1,406 @@ +//! Cross-platform test utilities for spawning commands that work on both Unix and Windows. +//! +//! This module provides helper functions that abstract over platform-specific commands. +//! On Unix, commands use `bash`, `sleep`, `cat`, etc. +//! On Windows, commands use `cmd`, `powershell`, `ping` (for delays), etc. +//! +//! The goal is to make integration tests work on all platforms without #[cfg(not(windows))] +//! guards scattered throughout the test code. + +#[cfg(windows)] +pub fn sleep_cmd(seconds: u32) -> (&'static str, Vec) { + // Use ping as a sleep alternative on Windows + // Pings localhost N+1 times with 1-second intervals (approximately N seconds total) + let pings = seconds + 1; + ( + "ping", + vec!["-n".to_string(), pings.to_string(), "127.0.0.1".to_string()], + ) +} + +#[cfg(not(windows))] +pub fn sleep_cmd(seconds: u32) -> (&'static str, Vec) { + ("sleep", vec![seconds.to_string()]) +} + +#[cfg(windows)] +pub fn echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { + let pings = seconds + 1; + ( + "cmd", + vec![ + "/C".to_string(), + format!("echo {} && ping -n {} 127.0.0.1 >nul", text, pings), + ], + ) +} + +#[cfg(not(windows))] +pub fn echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + format!("echo '{}' && sleep {}", text, seconds), + ], + ) +} + +#[cfg(windows)] +pub fn stderr_echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { + let pings = seconds + 1; + ( + "cmd", + vec![ + "/C".to_string(), + format!("echo {} 1>&2 && ping -n {} 127.0.0.1 >nul", text, pings), + ], + ) +} + +#[cfg(not(windows))] +pub fn stderr_echo_with_sleep_cmd(text: &str, seconds: u32) -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + format!("echo '{}' >&2 && sleep {}", text, seconds), + ], + ) +} + +#[cfg(windows)] +pub fn multi_echo_stderr_cmd( + buffered: &str, + sleep1: f32, + realtime: &str, + sleep2: u32, +) -> (&'static str, Vec) { + let sleep1_ms = (sleep1 * 1000.0) as u32; + let pings = sleep2 + 1; + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + format!( + "[Console]::Error.WriteLine('{}'); Start-Sleep -Milliseconds {}; [Console]::Error.WriteLine('{}'); ping -n {} 127.0.0.1 >$null", + buffered, sleep1_ms, realtime, pings + ), + ], + ) +} + +#[cfg(not(windows))] +pub fn multi_echo_stderr_cmd( + buffered: &str, + sleep1: f32, + realtime: &str, + sleep2: u32, +) -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + format!( + "echo '{}' >&2; sleep {}; echo '{}' >&2; sleep {}", + buffered, sleep1, realtime, sleep2 + ), + ], + ) +} + +#[cfg(windows)] +pub fn multi_echo_stdout_cmd( + buffered: &str, + sleep1: f32, + realtime: &str, + sleep2: u32, +) -> (&'static str, Vec) { + let sleep1_ms = (sleep1 * 1000.0) as u32; + let pings = sleep2 + 1; + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + format!( + "Write-Output '{}'; Start-Sleep -Milliseconds {}; Write-Output '{}'; ping -n {} 127.0.0.1 >$null", + buffered, sleep1_ms, realtime, pings + ), + ], + ) +} + +#[cfg(not(windows))] +pub fn multi_echo_stdout_cmd( + buffered: &str, + sleep1: f32, + realtime: &str, + sleep2: u32, +) -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + format!( + "echo '{}'; sleep {}; echo '{}'; sleep {}", + buffered, sleep1, realtime, sleep2 + ), + ], + ) +} + +#[cfg(windows)] +pub fn cat_cmd() -> (&'static str, Vec) { + // Use Python for reliable line-by-line I/O on Windows. + // `-u` flag disables buffering for immediate output. + ( + python_cmd(), + vec![ + "-u".to_string(), + "-c".to_string(), + "import sys; [print(line.rstrip()) for line in sys.stdin]".to_string(), + ], + ) +} + +#[cfg(not(windows))] +pub fn cat_cmd() -> (&'static str, Vec) { + ("cat", vec![]) +} + +#[cfg(windows)] +pub fn loop_stdin_to_stdout_cmd() -> (&'static str, Vec) { + // PowerShell script that reads line by line and echoes + // Use [Console]::In to read from stdin and [Console]::WriteLine() for immediate flushing + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + "while($line = [Console]::In.ReadLine()) { [Console]::WriteLine('response') }" + .to_string(), + ], + ) +} + +#[cfg(not(windows))] +pub fn loop_stdin_to_stdout_cmd() -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + "while true; do read line; echo response; done".to_string(), + ], + ) +} + +#[cfg(windows)] +pub fn continuous_stderr_loop_cmd() -> (&'static str, Vec) { + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + "while($true) { [Console]::Error.WriteLine('error'); Start-Sleep -Milliseconds 100 }" + .to_string(), + ], + ) +} + +#[cfg(not(windows))] +pub fn continuous_stderr_loop_cmd() -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + "while true; do echo error >&2; sleep 0.1; done".to_string(), + ], + ) +} + +#[cfg(windows)] +pub fn generate_large_output_cmd(size: usize) -> (&'static str, Vec) { + // Generate large output using PowerShell + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + format!("'A' * {}; ping -n 11 127.0.0.1 >$null", size), + ], + ) +} + +#[cfg(not(windows))] +pub fn generate_large_output_cmd(size: usize) -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + format!("head -c {} /dev/zero | tr '\\0' 'A'; sleep 10", size), + ], + ) +} + +#[cfg(windows)] +pub fn numbered_output_loop_cmd(count: u32, interval_ms: u32) -> (&'static str, Vec) { + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + format!( + "1..{} | ForEach-Object {{ Write-Output \"stdout_line_$_\"; [Console]::Error.WriteLine(\"stderr_line_$_\"); Start-Sleep -Milliseconds {} }}", + count, interval_ms + ), + ], + ) +} + +#[cfg(not(windows))] +pub fn numbered_output_loop_cmd(count: u32, interval_ms: u32) -> (&'static str, Vec) { + let interval_sec = interval_ms as f32 / 1000.0; + ( + "bash", + vec![ + "-c".to_string(), + format!( + "for i in {{1..{count}}}; do echo \"stdout_line_$i\"; echo \"stderr_line_$i\" >&2; sleep {interval_sec}; done" + ), + ], + ) +} + +#[cfg(windows)] +pub fn complex_stderr_reconnect_cmd() -> (&'static str, Vec) { + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + concat!( + "[Console]::Error.WriteLine('before_connection'); Start-Sleep -Milliseconds 500; ", + "[Console]::Error.WriteLine('during_first_connection'); Start-Sleep -Milliseconds 1000; ", + "[Console]::Error.WriteLine('trigger_disconnect'); Start-Sleep -Milliseconds 1500; ", + "[Console]::Error.WriteLine('while_disconnected'); Start-Sleep -Milliseconds 2000; ", + "[Console]::Error.WriteLine('during_second_connection'); Start-Sleep -Seconds 10" + ).to_string(), + ], + ) +} + +#[cfg(not(windows))] +pub fn complex_stderr_reconnect_cmd() -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + concat!( + "echo 'before_connection' >&2; sleep 0.5; ", + "echo 'during_first_connection' >&2; sleep 1; ", + "echo 'trigger_disconnect' >&2; sleep 1.5; ", + "echo 'while_disconnected' >&2; sleep 2; ", + "echo 'during_second_connection' >&2; sleep 10", + ) + .to_string(), + ], + ) +} + +#[cfg(windows)] +pub fn combined_output_cmd( + stdout_msg: &str, + stderr_msg: &str, + sleep_sec: u32, +) -> (&'static str, Vec) { + let pings = sleep_sec + 1; + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + format!( + "Write-Output '{}'; [Console]::Error.WriteLine('{}'); ping -n {} 127.0.0.1 >$null", + stdout_msg, stderr_msg, pings + ), + ], + ) +} + +#[cfg(not(windows))] +pub fn combined_output_cmd( + stdout_msg: &str, + stderr_msg: &str, + sleep_sec: u32, +) -> (&'static str, Vec) { + ( + "bash", + vec![ + "-c".to_string(), + format!( + "echo '{}'; echo '{}' >&2; sleep {}", + stdout_msg, stderr_msg, sleep_sec + ), + ], + ) +} + +#[cfg(windows)] +pub fn echo_args_cmd(args: &[&str]) -> (&'static str, Vec) { + let mut cmd_args = vec!["/C".to_string()]; + // Use echo %* to print all arguments on Windows (requires a batch context) + // Alternative: build the echo command with all args + let echo_str = args.join(" "); + cmd_args.push(format!("echo {} && ping -n 6 127.0.0.1 >nul", echo_str)); + ("cmd", cmd_args) +} + +#[cfg(not(windows))] +pub fn echo_args_cmd(args: &[&str]) -> (&'static str, Vec) { + let mut script_args = vec![ + "-c".to_string(), + "echo $@ && sleep 5".to_string(), + "--".to_string(), + ]; + script_args.extend(args.iter().map(|s| s.to_string())); + ("bash", script_args) +} + +#[cfg(windows)] +pub fn short_lived_cmd(msg: &str, sleep_ms: u32) -> (&'static str, Vec) { + ( + "powershell", + vec![ + "-NoProfile".to_string(), + "-Command".to_string(), + format!( + "Write-Output '{}'; Start-Sleep -Milliseconds {}", + msg, sleep_ms + ), + ], + ) +} + +#[cfg(not(windows))] +pub fn short_lived_cmd(msg: &str, sleep_ms: u32) -> (&'static str, Vec) { + let sleep_sec = sleep_ms as f32 / 1000.0; + ( + "bash", + vec![ + "-c".to_string(), + format!("echo {} && sleep {}", msg, sleep_sec), + ], + ) +} + +#[cfg(windows)] +pub fn python_cmd() -> &'static str { + "python" +} + +#[cfg(not(windows))] +pub fn python_cmd() -> &'static str { + "python3" +}