@@ -13,8 +13,11 @@ use std::ffi::OsString;
1313use std::io;
1414use std::path::PathBuf;
1515use std::process::Stdio;
16+ #[cfg(unix)]
1617use std::thread::sleep;
18+ #[cfg(unix)]
1719use std::thread::spawn;
20+ #[cfg(unix)]
1821use std::time::Duration;
1922
2023#[cfg(unix)]
@@ -33,6 +36,8 @@ use tracing::warn;
3336use crate::program_resolver;
3437use crate::utils::create_env_for_mcp_server;
3538
39+ // General purpose public code.
40+
3641/// Launches an MCP stdio server and returns the byte transport for rmcp.
3742///
3843/// This trait is the boundary between MCP lifecycle code and process placement.
@@ -47,14 +52,6 @@ pub trait StdioServerLauncher: private::Sealed + Send + Sync {
4752 ) -> BoxFuture<'static, io::Result<LaunchedStdioServer>>;
4853}
4954
50- /// Starts MCP stdio servers as local child processes.
51- ///
52- /// This is the existing behavior for local MCP servers: the orchestrator
53- /// process spawns the configured command and rmcp talks to the child's local
54- /// stdin/stdout pipes directly.
55- #[derive(Clone)]
56- pub struct LocalStdioServerLauncher;
57-
5855/// Command-line process shape shared by stdio server launchers.
5956#[derive(Clone)]
6057pub struct StdioServerCommand {
@@ -81,23 +78,6 @@ pub(super) enum LaunchedStdioServerTransport {
8178 },
8279}
8380
84- #[cfg(unix)]
85- const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
86-
87- #[cfg(unix)]
88- pub(super) struct ProcessGroupGuard {
89- process_group_id: u32,
90- }
91-
92- #[cfg(not(unix))]
93- pub(super) struct ProcessGroupGuard;
94-
95- mod private {
96- pub trait Sealed {}
97- }
98-
99- impl private::Sealed for LocalStdioServerLauncher {}
100-
10181impl StdioServerCommand {
10282 /// Build the stdio process parameters before choosing where the process
10383 /// runs.
@@ -118,70 +98,102 @@ impl StdioServerCommand {
11898 }
11999}
120100
101+ // Local public implementation.
102+
103+ /// Starts MCP stdio servers as local child processes.
104+ ///
105+ /// This is the existing behavior for local MCP servers: the orchestrator
106+ /// process spawns the configured command and rmcp talks to the child's local
107+ /// stdin/stdout pipes directly.
108+ #[derive(Clone)]
109+ pub struct LocalStdioServerLauncher;
110+
121111impl StdioServerLauncher for LocalStdioServerLauncher {
122112 fn launch(
123113 &self,
124114 command: StdioServerCommand,
125115 ) -> BoxFuture<'static, io::Result<LaunchedStdioServer>> {
126- async move { launch_stdio_server_locally (command) }.boxed()
116+ async move { Self::launch_server (command) }.boxed()
127117 }
128118}
129119
130- fn launch_stdio_server_locally(command: StdioServerCommand) -> io::Result<LaunchedStdioServer> {
131- let StdioServerCommand {
132- program,
133- args,
134- env,
135- env_vars,
136- cwd,
137- } = command;
138- let program_name = program.to_string_lossy().into_owned();
139- let envs = create_env_for_mcp_server(env, &env_vars);
140- let resolved_program = program_resolver::resolve(program, &envs).map_err(io::Error::other)?;
141-
142- let mut command = Command::new(resolved_program);
143- command
144- .kill_on_drop(true)
145- .stdin(Stdio::piped())
146- .stdout(Stdio::piped())
147- .env_clear()
148- .envs(envs)
149- .args(args);
150- #[cfg(unix)]
151- command.process_group(0);
152- if let Some(cwd) = cwd {
153- command.current_dir(cwd);
154- }
120+ // Local private implementation.
155121
156- let (transport, stderr) = TokioChildProcess::builder(command)
157- .stderr(Stdio::piped())
158- .spawn()?;
159- let process_group_guard = transport.id().map(ProcessGroupGuard::new);
160-
161- if let Some(stderr) = stderr {
162- tokio::spawn(async move {
163- let mut reader = BufReader::new(stderr).lines();
164- loop {
165- match reader.next_line().await {
166- Ok(Some(line)) => {
167- info!("MCP server stderr ({program_name}): {line}");
168- }
169- Ok(None) => break,
170- Err(error) => {
171- warn!("Failed to read MCP server stderr ({program_name}): {error}");
172- break;
122+ #[cfg(unix)]
123+ const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
124+
125+ #[cfg(unix)]
126+ pub(super) struct ProcessGroupGuard {
127+ process_group_id: u32,
128+ }
129+
130+ #[cfg(not(unix))]
131+ pub(super) struct ProcessGroupGuard;
132+
133+ mod private {
134+ pub trait Sealed {}
135+ }
136+
137+ impl private::Sealed for LocalStdioServerLauncher {}
138+
139+ impl LocalStdioServerLauncher {
140+ fn launch_server(command: StdioServerCommand) -> io::Result<LaunchedStdioServer> {
141+ let StdioServerCommand {
142+ program,
143+ args,
144+ env,
145+ env_vars,
146+ cwd,
147+ } = command;
148+ let program_name = program.to_string_lossy().into_owned();
149+ let envs = create_env_for_mcp_server(env, &env_vars);
150+ let resolved_program =
151+ program_resolver::resolve(program, &envs).map_err(io::Error::other)?;
152+
153+ let mut command = Command::new(resolved_program);
154+ command
155+ .kill_on_drop(true)
156+ .stdin(Stdio::piped())
157+ .stdout(Stdio::piped())
158+ .env_clear()
159+ .envs(envs)
160+ .args(args);
161+ #[cfg(unix)]
162+ command.process_group(0);
163+ if let Some(cwd) = cwd {
164+ command.current_dir(cwd);
165+ }
166+
167+ let (transport, stderr) = TokioChildProcess::builder(command)
168+ .stderr(Stdio::piped())
169+ .spawn()?;
170+ let process_group_guard = transport.id().map(ProcessGroupGuard::new);
171+
172+ if let Some(stderr) = stderr {
173+ tokio::spawn(async move {
174+ let mut reader = BufReader::new(stderr).lines();
175+ loop {
176+ match reader.next_line().await {
177+ Ok(Some(line)) => {
178+ info!("MCP server stderr ({program_name}): {line}");
179+ }
180+ Ok(None) => break,
181+ Err(error) => {
182+ warn!("Failed to read MCP server stderr ({program_name}): {error}");
183+ break;
184+ }
173185 }
174186 }
175- }
176- });
177- }
187+ });
188+ }
178189
179- Ok(LaunchedStdioServer {
180- transport: LaunchedStdioServerTransport::Local {
181- transport,
182- process_group_guard,
183- },
184- })
190+ Ok(LaunchedStdioServer {
191+ transport: LaunchedStdioServerTransport::Local {
192+ transport,
193+ process_group_guard,
194+ },
195+ })
196+ }
185197}
186198
187199impl ProcessGroupGuard {
0 commit comments