@@ -35,7 +35,9 @@ use async_channel::Sender;
3535use codex_async_utils:: CancelErr ;
3636use codex_async_utils:: OrCancelExt ;
3737use codex_config:: Constrained ;
38+ use codex_config:: McpServerEnvironment ;
3839use codex_config:: types:: OAuthCredentialsStoreMode ;
40+ use codex_exec_server:: Environment ;
3941use codex_protocol:: approvals:: ElicitationRequest ;
4042use codex_protocol:: approvals:: ElicitationRequestEvent ;
4143use codex_protocol:: mcp:: CallToolResult ;
@@ -49,8 +51,11 @@ use codex_protocol::protocol::McpStartupStatus;
4951use codex_protocol:: protocol:: McpStartupUpdateEvent ;
5052use codex_protocol:: protocol:: SandboxPolicy ;
5153use codex_rmcp_client:: ElicitationResponse ;
54+ use codex_rmcp_client:: ExecutorStdioTransportRuntime ;
55+ use codex_rmcp_client:: LocalStdioTransportRuntime ;
5256use codex_rmcp_client:: RmcpClient ;
5357use codex_rmcp_client:: SendElicitation ;
58+ use codex_rmcp_client:: StdioTransportRuntime ;
5459use futures:: future:: BoxFuture ;
5560use futures:: future:: FutureExt ;
5661use futures:: future:: Shared ;
@@ -479,6 +484,24 @@ impl ManagedClient {
479484 }
480485}
481486
487+ /// Builds the stdio runtime for MCP servers that run in the executor
488+ /// environment.
489+ ///
490+ /// The connection manager only decides placement. The returned trait object
491+ /// hides the implementation detail that remote stdio is backed by
492+ /// `process/start` plus process read/write calls, while local stdio is backed by
493+ /// a direct child process.
494+ fn build_remote_stdio_runtime (
495+ environment : Option < Arc < Environment > > ,
496+ session_cwd : PathBuf ,
497+ ) -> Option < Arc < dyn StdioTransportRuntime > > {
498+ let environment = environment?;
499+ Some ( Arc :: new ( ExecutorStdioTransportRuntime :: new (
500+ environment. get_exec_backend ( ) ,
501+ session_cwd,
502+ ) ) )
503+ }
504+
482505#[ derive( Clone ) ]
483506struct AsyncManagedClient {
484507 client : Shared < BoxFuture < ' static , Result < ManagedClient , StartupOutcomeError > > > ,
@@ -500,6 +523,7 @@ impl AsyncManagedClient {
500523 elicitation_requests : ElicitationRequestManager ,
501524 codex_apps_tools_cache_context : Option < CodexAppsToolsCacheContext > ,
502525 tool_plugin_provenance : Arc < ToolPluginProvenance > ,
526+ remote_stdio_runtime : Option < Arc < dyn StdioTransportRuntime > > ,
503527 ) -> Self {
504528 let tool_filter = ToolFilter :: from_config ( & config) ;
505529 let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot (
@@ -516,8 +540,15 @@ impl AsyncManagedClient {
516540 return Err ( error. into ( ) ) ;
517541 }
518542
519- let client =
520- Arc :: new ( make_rmcp_client ( & server_name, config. transport , store_mode) . await ?) ;
543+ let client = Arc :: new (
544+ make_rmcp_client (
545+ & server_name,
546+ config. clone ( ) ,
547+ store_mode,
548+ remote_stdio_runtime,
549+ )
550+ . await ?,
551+ ) ;
521552 match start_server_task (
522553 server_name,
523554 client,
@@ -724,6 +755,7 @@ impl McpConnectionManager {
724755 submit_id : String ,
725756 tx_event : Sender < Event > ,
726757 initial_sandbox_state : SandboxState ,
758+ environment : Option < Arc < Environment > > ,
727759 codex_home : PathBuf ,
728760 codex_apps_tools_cache_key : CodexAppsToolsCacheKey ,
729761 tool_plugin_provenance : ToolPluginProvenance ,
@@ -738,6 +770,8 @@ impl McpConnectionManager {
738770 ) ;
739771 let tool_plugin_provenance = Arc :: new ( tool_plugin_provenance) ;
740772 let startup_submit_id = submit_id. clone ( ) ;
773+ let remote_stdio_runtime =
774+ build_remote_stdio_runtime ( environment, initial_sandbox_state. sandbox_cwd . clone ( ) ) ;
741775 let mcp_servers = mcp_servers. clone ( ) ;
742776 for ( server_name, cfg) in mcp_servers. into_iter ( ) . filter ( |( _, cfg) | cfg. enabled ) {
743777 if let Some ( origin) = transport_origin ( & cfg. transport ) {
@@ -770,6 +804,7 @@ impl McpConnectionManager {
770804 elicitation_requests. clone ( ) ,
771805 codex_apps_tools_cache_context,
772806 Arc :: clone ( & tool_plugin_provenance) ,
807+ remote_stdio_runtime. clone ( ) ,
773808 ) ;
774809 clients. insert ( server_name. clone ( ) , async_managed_client. clone ( ) ) ;
775810 let tx_event = tx_event. clone ( ) ;
@@ -1532,9 +1567,16 @@ struct StartServerTaskParams {
15321567
15331568async fn make_rmcp_client (
15341569 server_name : & str ,
1535- transport : McpServerTransportConfig ,
1570+ config : McpServerConfig ,
15361571 store_mode : OAuthCredentialsStoreMode ,
1572+ remote_stdio_runtime : Option < Arc < dyn StdioTransportRuntime > > ,
15371573) -> Result < RmcpClient , StartupOutcomeError > {
1574+ let McpServerConfig {
1575+ transport,
1576+ environment,
1577+ ..
1578+ } = config;
1579+
15381580 match transport {
15391581 McpServerTransportConfig :: Stdio {
15401582 command,
@@ -1550,7 +1592,22 @@ async fn make_rmcp_client(
15501592 . map ( |( key, value) | ( key. into ( ) , value. into ( ) ) )
15511593 . collect :: < HashMap < _ , _ > > ( )
15521594 } ) ;
1553- RmcpClient :: new_stdio_client ( command_os, args_os, env_os, & env_vars, cwd)
1595+ let runtime = match environment {
1596+ McpServerEnvironment :: Local => {
1597+ Arc :: new ( LocalStdioTransportRuntime ) as Arc < dyn StdioTransportRuntime >
1598+ }
1599+ McpServerEnvironment :: Remote => remote_stdio_runtime. ok_or_else ( || {
1600+ StartupOutcomeError :: from ( anyhow ! (
1601+ "remote MCP server `{server_name}` requires an executor environment"
1602+ ) )
1603+ } ) ?,
1604+ } ;
1605+
1606+ // `RmcpClient` always sees an MCP stdio transport. The runtime
1607+ // trait hides whether that transport was created by spawning a
1608+ // local child process or by asking the executor to start the child
1609+ // and stream its stdin/stdout bytes over the process API.
1610+ RmcpClient :: new_stdio_client ( command_os, args_os, env_os, & env_vars, cwd, runtime)
15541611 . await
15551612 . map_err ( |err| StartupOutcomeError :: from ( anyhow ! ( err) ) )
15561613 }
@@ -1559,23 +1616,34 @@ async fn make_rmcp_client(
15591616 http_headers,
15601617 env_http_headers,
15611618 bearer_token_env_var,
1562- } => {
1563- let resolved_bearer_token =
1564- match resolve_bearer_token ( server_name, bearer_token_env_var. as_deref ( ) ) {
1565- Ok ( token) => token,
1566- Err ( error) => return Err ( error. into ( ) ) ,
1567- } ;
1568- RmcpClient :: new_streamable_http_client (
1569- server_name,
1570- & url,
1571- resolved_bearer_token,
1572- http_headers,
1573- env_http_headers,
1574- store_mode,
1575- )
1576- . await
1577- . map_err ( StartupOutcomeError :: from)
1578- }
1619+ } => match environment {
1620+ McpServerEnvironment :: Local => {
1621+ // Local streamable HTTP remains the existing reqwest path from
1622+ // the orchestrator process.
1623+ let resolved_bearer_token =
1624+ match resolve_bearer_token ( server_name, bearer_token_env_var. as_deref ( ) ) {
1625+ Ok ( token) => token,
1626+ Err ( error) => return Err ( error. into ( ) ) ,
1627+ } ;
1628+ RmcpClient :: new_streamable_http_client (
1629+ server_name,
1630+ & url,
1631+ resolved_bearer_token,
1632+ http_headers,
1633+ env_http_headers,
1634+ store_mode,
1635+ )
1636+ . await
1637+ . map_err ( StartupOutcomeError :: from)
1638+ }
1639+ McpServerEnvironment :: Remote => Err ( StartupOutcomeError :: from ( anyhow ! (
1640+ // Remote HTTP needs the future low-level executor
1641+ // `network/request` API so reqwest runs on the executor side.
1642+ // Do not fall back to local HTTP here; the config explicitly
1643+ // asked for remote placement.
1644+ "remote streamable HTTP MCP server `{server_name}` is not implemented yet"
1645+ ) ) ) ,
1646+ } ,
15791647 }
15801648}
15811649
0 commit comments