|
| 1 | +//! rmcp transport adapter for an executor-managed MCP stdio process. |
| 2 | +//! |
| 3 | +//! This module owns the lower-level byte translation after |
| 4 | +//! `stdio_server_launcher` has already started a process through |
| 5 | +//! `ExecBackend::start`. It does not choose where the MCP server runs and it |
| 6 | +//! does not implement MCP lifecycle behavior. MCP protocol ownership stays in |
| 7 | +//! `RmcpClient` and rmcp: |
| 8 | +//! |
| 9 | +//! 1. rmcp serializes a JSON-RPC message and calls [`Transport::send`]. |
| 10 | +//! 2. This transport appends the stdio newline delimiter and writes those bytes |
| 11 | +//! to executor `process/write`. |
| 12 | +//! 3. The executor writes the bytes to the child process stdin. |
| 13 | +//! 4. The child writes newline-delimited JSON-RPC messages to stdout. |
| 14 | +//! 5. The executor reports stdout bytes through pushed process events. |
| 15 | +//! 6. This transport buffers stdout until it has one full line, deserializes |
| 16 | +//! that line, and returns the rmcp message from [`Transport::receive`]. |
| 17 | +//! |
| 18 | +//! Stderr is deliberately not part of the MCP byte stream. It is logged for |
| 19 | +//! diagnostics only, matching the local stdio implementation. |
| 20 | +
|
| 21 | +use std::future::Future; |
| 22 | +use std::io; |
| 23 | +use std::mem::take; |
| 24 | +use std::sync::Arc; |
| 25 | +use std::sync::atomic::AtomicUsize; |
| 26 | +use std::sync::atomic::Ordering; |
| 27 | + |
| 28 | +use codex_exec_server::ExecOutputStream; |
| 29 | +use codex_exec_server::ExecProcess; |
| 30 | +use codex_exec_server::ExecProcessEvent; |
| 31 | +use codex_exec_server::ProcessId; |
| 32 | +use codex_exec_server::ProcessOutputChunk; |
| 33 | +use codex_exec_server::WriteStatus; |
| 34 | +use rmcp::service::RoleClient; |
| 35 | +use rmcp::service::RxJsonRpcMessage; |
| 36 | +use rmcp::service::TxJsonRpcMessage; |
| 37 | +use rmcp::transport::Transport; |
| 38 | +use serde_json::from_slice; |
| 39 | +use serde_json::to_vec; |
| 40 | +use tokio::sync::broadcast; |
| 41 | +use tracing::debug; |
| 42 | +use tracing::info; |
| 43 | +use tracing::warn; |
| 44 | + |
| 45 | +static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1); |
| 46 | + |
| 47 | +// Remote public implementation. |
| 48 | + |
| 49 | +/// A client-side rmcp transport backed by an executor-managed process. |
| 50 | +/// |
| 51 | +/// The orchestrator owns this value and calls rmcp on it. The process it wraps |
| 52 | +/// may be local or remote depending on the `ExecBackend` used to create it, but |
| 53 | +/// for remote MCP stdio the process lives on the executor and all interaction |
| 54 | +/// crosses the executor process RPC boundary. |
| 55 | +pub(super) struct ExecutorProcessTransport { |
| 56 | + /// Logical process handle returned by the executor process API. |
| 57 | + /// |
| 58 | + /// `write` forwards stdin bytes. `terminate` stops the child when rmcp |
| 59 | + /// closes the transport. |
| 60 | + process: Arc<dyn ExecProcess>, |
| 61 | + |
| 62 | + /// Pushed output/lifecycle stream for the process. |
| 63 | + /// |
| 64 | + /// The executor process API still supports retained-output reads, but MCP |
| 65 | + /// stdio is naturally streaming. This receiver lets rmcp wait for stdout |
| 66 | + /// chunks without issuing `process/read` after each output notification. |
| 67 | + events: broadcast::Receiver<ExecProcessEvent>, |
| 68 | + |
| 69 | + /// Human-readable program name used only in diagnostics. |
| 70 | + program_name: String, |
| 71 | + |
| 72 | + /// Buffered child stdout bytes that have not yet formed a complete |
| 73 | + /// newline-delimited JSON-RPC message. |
| 74 | + stdout: Vec<u8>, |
| 75 | + |
| 76 | + /// Buffered stderr bytes for diagnostic logging. |
| 77 | + stderr: Vec<u8>, |
| 78 | + |
| 79 | + /// Whether the executor has reported process closure or a terminal |
| 80 | + /// subscription failure. Once closed, any remaining partial stdout line is |
| 81 | + /// flushed once and then rmcp receives EOF. |
| 82 | + closed: bool, |
| 83 | +} |
| 84 | + |
| 85 | +impl ExecutorProcessTransport { |
| 86 | + pub(super) fn new(process: Arc<dyn ExecProcess>, program_name: String) -> Self { |
| 87 | + let events = process.subscribe_events(); |
| 88 | + Self { |
| 89 | + process, |
| 90 | + events, |
| 91 | + program_name, |
| 92 | + stdout: Vec::new(), |
| 93 | + stderr: Vec::new(), |
| 94 | + closed: false, |
| 95 | + } |
| 96 | + } |
| 97 | + |
| 98 | + pub(super) fn next_process_id() -> ProcessId { |
| 99 | + // Process IDs are logical handles scoped to the executor connection, |
| 100 | + // not OS pids. A monotonic client-side id is enough to avoid |
| 101 | + // collisions between MCP servers started in the same session. |
| 102 | + let index = PROCESS_COUNTER.fetch_add(1, Ordering::Relaxed); |
| 103 | + ProcessId::from(format!("mcp-stdio-{index}")) |
| 104 | + } |
| 105 | +} |
| 106 | + |
| 107 | +impl Transport<RoleClient> for ExecutorProcessTransport { |
| 108 | + type Error = io::Error; |
| 109 | + |
| 110 | + fn send( |
| 111 | + &mut self, |
| 112 | + item: TxJsonRpcMessage<RoleClient>, |
| 113 | + ) -> impl Future<Output = std::result::Result<(), Self::Error>> + Send + 'static { |
| 114 | + let process = Arc::clone(&self.process); |
| 115 | + async move { |
| 116 | + // rmcp hands us a structured JSON-RPC message. Stdio transport on |
| 117 | + // the wire is JSON plus one newline delimiter. |
| 118 | + let mut bytes = to_vec(&item).map_err(io::Error::other)?; |
| 119 | + bytes.push(b'\n'); |
| 120 | + let response = process.write(bytes).await.map_err(io::Error::other)?; |
| 121 | + match response.status { |
| 122 | + WriteStatus::Accepted => Ok(()), |
| 123 | + WriteStatus::UnknownProcess => { |
| 124 | + Err(io::Error::new(io::ErrorKind::BrokenPipe, "unknown process")) |
| 125 | + } |
| 126 | + WriteStatus::StdinClosed => { |
| 127 | + Err(io::Error::new(io::ErrorKind::BrokenPipe, "stdin closed")) |
| 128 | + } |
| 129 | + WriteStatus::Starting => Err(io::Error::new( |
| 130 | + io::ErrorKind::WouldBlock, |
| 131 | + "process is starting", |
| 132 | + )), |
| 133 | + } |
| 134 | + } |
| 135 | + } |
| 136 | + |
| 137 | + fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<RoleClient>>> + Send { |
| 138 | + self.receive_message() |
| 139 | + } |
| 140 | + |
| 141 | + async fn close(&mut self) -> std::result::Result<(), Self::Error> { |
| 142 | + self.process.terminate().await.map_err(io::Error::other) |
| 143 | + } |
| 144 | +} |
| 145 | + |
| 146 | +// Remote private implementation. |
| 147 | + |
| 148 | +impl ExecutorProcessTransport { |
| 149 | + async fn receive_message(&mut self) -> Option<RxJsonRpcMessage<RoleClient>> { |
| 150 | + loop { |
| 151 | + // rmcp stdio framing is line-oriented JSON. We first drain any |
| 152 | + // complete line already buffered from an earlier process event. |
| 153 | + if let Some(message) = self.take_stdout_message(/*allow_partial*/ self.closed) { |
| 154 | + return Some(message); |
| 155 | + } |
| 156 | + if self.closed { |
| 157 | + self.flush_stderr(); |
| 158 | + return None; |
| 159 | + } |
| 160 | + |
| 161 | + match self.events.recv().await { |
| 162 | + Ok(ExecProcessEvent::Output(chunk)) => { |
| 163 | + self.push_process_output(chunk); |
| 164 | + } |
| 165 | + Ok(ExecProcessEvent::Exited { .. }) => { |
| 166 | + // Wait for `Closed` before ending the rmcp stream so any |
| 167 | + // output flushed during process shutdown can still be |
| 168 | + // decoded into JSON-RPC messages. |
| 169 | + } |
| 170 | + Ok(ExecProcessEvent::Closed { .. }) => { |
| 171 | + self.closed = true; |
| 172 | + } |
| 173 | + Ok(ExecProcessEvent::Failed(message)) => { |
| 174 | + warn!( |
| 175 | + "Remote MCP server process failed ({}): {message}", |
| 176 | + self.program_name |
| 177 | + ); |
| 178 | + self.closed = true; |
| 179 | + } |
| 180 | + Err(broadcast::error::RecvError::Lagged(skipped)) => { |
| 181 | + warn!( |
| 182 | + "Remote MCP server output stream lagged ({}): skipped {skipped} events", |
| 183 | + self.program_name |
| 184 | + ); |
| 185 | + self.closed = true; |
| 186 | + } |
| 187 | + Err(broadcast::error::RecvError::Closed) => { |
| 188 | + self.closed = true; |
| 189 | + } |
| 190 | + } |
| 191 | + } |
| 192 | + } |
| 193 | + |
| 194 | + fn push_process_output(&mut self, chunk: ProcessOutputChunk) { |
| 195 | + let bytes = chunk.chunk.into_inner(); |
| 196 | + match chunk.stream { |
| 197 | + // MCP stdio uses stdout as the protocol stream. PTY output is |
| 198 | + // accepted defensively because the executor process API has a |
| 199 | + // unified stream enum, but remote MCP starts with `tty=false`. |
| 200 | + ExecOutputStream::Stdout | ExecOutputStream::Pty => { |
| 201 | + self.stdout.extend_from_slice(&bytes); |
| 202 | + } |
| 203 | + // Stderr is intentionally out-of-band. It should help debug server |
| 204 | + // startup failures without entering rmcp framing. |
| 205 | + ExecOutputStream::Stderr => { |
| 206 | + self.push_stderr(&bytes); |
| 207 | + } |
| 208 | + } |
| 209 | + } |
| 210 | + |
| 211 | + fn take_stdout_message(&mut self, allow_partial: bool) -> Option<RxJsonRpcMessage<RoleClient>> { |
| 212 | + // A normal MCP stdio server emits one JSON-RPC message per newline. |
| 213 | + // If the process has already closed, accept a final unterminated line |
| 214 | + // so EOF after a complete JSON object behaves like local rmcp's |
| 215 | + // `decode_eof` handling. |
| 216 | + let line_end = self.stdout.iter().position(|byte| *byte == b'\n'); |
| 217 | + let line = match (line_end, allow_partial && !self.stdout.is_empty()) { |
| 218 | + (Some(index), _) => { |
| 219 | + let mut line = self.stdout.drain(..=index).collect::<Vec<_>>(); |
| 220 | + line.pop(); |
| 221 | + line |
| 222 | + } |
| 223 | + (None, true) => self.stdout.drain(..).collect(), |
| 224 | + (None, false) => return None, |
| 225 | + }; |
| 226 | + let line = Self::trim_trailing_carriage_return(line); |
| 227 | + match from_slice::<RxJsonRpcMessage<RoleClient>>(&line) { |
| 228 | + Ok(message) => Some(message), |
| 229 | + Err(error) => { |
| 230 | + debug!( |
| 231 | + "Failed to parse remote MCP server message ({}): {error}", |
| 232 | + self.program_name |
| 233 | + ); |
| 234 | + None |
| 235 | + } |
| 236 | + } |
| 237 | + } |
| 238 | + |
| 239 | + fn push_stderr(&mut self, bytes: &[u8]) { |
| 240 | + // Keep stderr line-oriented in logs so a chatty MCP server does not |
| 241 | + // produce one log record per byte chunk. |
| 242 | + self.stderr.extend_from_slice(bytes); |
| 243 | + while let Some(index) = self.stderr.iter().position(|byte| *byte == b'\n') { |
| 244 | + let mut line = self.stderr.drain(..=index).collect::<Vec<_>>(); |
| 245 | + line.pop(); |
| 246 | + if line.last() == Some(&b'\r') { |
| 247 | + line.pop(); |
| 248 | + } |
| 249 | + info!( |
| 250 | + "MCP server stderr ({}): {}", |
| 251 | + self.program_name, |
| 252 | + String::from_utf8_lossy(&line) |
| 253 | + ); |
| 254 | + } |
| 255 | + } |
| 256 | + |
| 257 | + fn flush_stderr(&mut self) { |
| 258 | + if self.stderr.is_empty() { |
| 259 | + return; |
| 260 | + } |
| 261 | + let line = take(&mut self.stderr); |
| 262 | + info!( |
| 263 | + "MCP server stderr ({}): {}", |
| 264 | + self.program_name, |
| 265 | + String::from_utf8_lossy(&line) |
| 266 | + ); |
| 267 | + } |
| 268 | + |
| 269 | + fn trim_trailing_carriage_return(mut line: Vec<u8>) -> Vec<u8> { |
| 270 | + if line.last() == Some(&b'\r') { |
| 271 | + line.pop(); |
| 272 | + } |
| 273 | + line |
| 274 | + } |
| 275 | +} |
0 commit comments