@@ -35,7 +35,9 @@ use async_channel::Sender;
3535use codex_async_utils:: CancelErr ;
3636use codex_async_utils:: OrCancelExt ;
3737use codex_config:: Constrained ;
38+ use codex_config:: McpServerEnvironment ;
3839use codex_config:: types:: OAuthCredentialsStoreMode ;
40+ use codex_exec_server:: Environment ;
3941use codex_protocol:: ToolName ;
4042use codex_protocol:: approvals:: ElicitationRequest ;
4143use codex_protocol:: approvals:: ElicitationRequestEvent ;
@@ -50,8 +52,11 @@ use codex_protocol::protocol::McpStartupStatus;
5052use codex_protocol:: protocol:: McpStartupUpdateEvent ;
5153use codex_protocol:: protocol:: SandboxPolicy ;
5254use codex_rmcp_client:: ElicitationResponse ;
55+ use codex_rmcp_client:: ExecutorStdioTransportRuntime ;
56+ use codex_rmcp_client:: LocalStdioTransportRuntime ;
5357use codex_rmcp_client:: RmcpClient ;
5458use codex_rmcp_client:: SendElicitation ;
59+ use codex_rmcp_client:: StdioTransportRuntime ;
5560use futures:: future:: BoxFuture ;
5661use futures:: future:: FutureExt ;
5762use futures:: future:: Shared ;
@@ -470,6 +475,24 @@ impl ManagedClient {
470475 }
471476}
472477
478+ /// Builds the stdio runtime for MCP servers that run in the executor
479+ /// environment.
480+ ///
481+ /// The connection manager only decides placement. The returned trait object
482+ /// hides the implementation detail that remote stdio is backed by
483+ /// `process/start` plus process read/write calls, while local stdio is backed by
484+ /// a direct child process.
485+ fn build_remote_stdio_runtime (
486+ environment : Option < Arc < Environment > > ,
487+ session_cwd : PathBuf ,
488+ ) -> Option < Arc < dyn StdioTransportRuntime > > {
489+ let environment = environment?;
490+ Some ( Arc :: new ( ExecutorStdioTransportRuntime :: new (
491+ environment. get_exec_backend ( ) ,
492+ session_cwd,
493+ ) ) )
494+ }
495+
473496#[ derive( Clone ) ]
474497struct AsyncManagedClient {
475498 client : Shared < BoxFuture < ' static , Result < ManagedClient , StartupOutcomeError > > > ,
@@ -491,6 +514,7 @@ impl AsyncManagedClient {
491514 elicitation_requests : ElicitationRequestManager ,
492515 codex_apps_tools_cache_context : Option < CodexAppsToolsCacheContext > ,
493516 tool_plugin_provenance : Arc < ToolPluginProvenance > ,
517+ remote_stdio_runtime : Option < Arc < dyn StdioTransportRuntime > > ,
494518 ) -> Self {
495519 let tool_filter = ToolFilter :: from_config ( & config) ;
496520 let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot (
@@ -507,8 +531,15 @@ impl AsyncManagedClient {
507531 return Err ( error. into ( ) ) ;
508532 }
509533
510- let client =
511- Arc :: new ( make_rmcp_client ( & server_name, config. transport , store_mode) . await ?) ;
534+ let client = Arc :: new (
535+ make_rmcp_client (
536+ & server_name,
537+ config. clone ( ) ,
538+ store_mode,
539+ remote_stdio_runtime,
540+ )
541+ . await ?,
542+ ) ;
512543 match start_server_task (
513544 server_name,
514545 client,
@@ -708,6 +739,8 @@ impl McpConnectionManager {
708739 submit_id : String ,
709740 tx_event : Sender < Event > ,
710741 initial_sandbox_policy : SandboxPolicy ,
742+ environment : Option < Arc < Environment > > ,
743+ remote_stdio_cwd : PathBuf ,
711744 codex_home : PathBuf ,
712745 codex_apps_tools_cache_key : CodexAppsToolsCacheKey ,
713746 tool_plugin_provenance : ToolPluginProvenance ,
@@ -720,6 +753,7 @@ impl McpConnectionManager {
720753 ElicitationRequestManager :: new ( approval_policy. value ( ) , initial_sandbox_policy) ;
721754 let tool_plugin_provenance = Arc :: new ( tool_plugin_provenance) ;
722755 let startup_submit_id = submit_id. clone ( ) ;
756+ let remote_stdio_runtime = build_remote_stdio_runtime ( environment, remote_stdio_cwd) ;
723757 let mcp_servers = mcp_servers. clone ( ) ;
724758 for ( server_name, cfg) in mcp_servers. into_iter ( ) . filter ( |( _, cfg) | cfg. enabled ) {
725759 if let Some ( origin) = transport_origin ( & cfg. transport ) {
@@ -752,6 +786,7 @@ impl McpConnectionManager {
752786 elicitation_requests. clone ( ) ,
753787 codex_apps_tools_cache_context,
754788 Arc :: clone ( & tool_plugin_provenance) ,
789+ remote_stdio_runtime. clone ( ) ,
755790 ) ;
756791 clients. insert ( server_name. clone ( ) , async_managed_client. clone ( ) ) ;
757792 let tx_event = tx_event. clone ( ) ;
@@ -1481,9 +1516,16 @@ struct StartServerTaskParams {
14811516
14821517async fn make_rmcp_client (
14831518 server_name : & str ,
1484- transport : McpServerTransportConfig ,
1519+ config : McpServerConfig ,
14851520 store_mode : OAuthCredentialsStoreMode ,
1521+ remote_stdio_runtime : Option < Arc < dyn StdioTransportRuntime > > ,
14861522) -> Result < RmcpClient , StartupOutcomeError > {
1523+ let McpServerConfig {
1524+ transport,
1525+ environment,
1526+ ..
1527+ } = config;
1528+
14871529 match transport {
14881530 McpServerTransportConfig :: Stdio {
14891531 command,
@@ -1499,7 +1541,22 @@ async fn make_rmcp_client(
14991541 . map ( |( key, value) | ( key. into ( ) , value. into ( ) ) )
15001542 . collect :: < HashMap < _ , _ > > ( )
15011543 } ) ;
1502- RmcpClient :: new_stdio_client ( command_os, args_os, env_os, & env_vars, cwd)
1544+ let runtime = match environment {
1545+ McpServerEnvironment :: Local => {
1546+ Arc :: new ( LocalStdioTransportRuntime ) as Arc < dyn StdioTransportRuntime >
1547+ }
1548+ McpServerEnvironment :: Remote => remote_stdio_runtime. ok_or_else ( || {
1549+ StartupOutcomeError :: from ( anyhow ! (
1550+ "remote MCP server `{server_name}` requires an executor environment"
1551+ ) )
1552+ } ) ?,
1553+ } ;
1554+
1555+ // `RmcpClient` always sees an MCP stdio transport. The runtime
1556+ // trait hides whether that transport was created by spawning a
1557+ // local child process or by asking the executor to start the child
1558+ // and stream its stdin/stdout bytes over the process API.
1559+ RmcpClient :: new_stdio_client ( command_os, args_os, env_os, & env_vars, cwd, runtime)
15031560 . await
15041561 . map_err ( |err| StartupOutcomeError :: from ( anyhow ! ( err) ) )
15051562 }
@@ -1508,23 +1565,34 @@ async fn make_rmcp_client(
15081565 http_headers,
15091566 env_http_headers,
15101567 bearer_token_env_var,
1511- } => {
1512- let resolved_bearer_token =
1513- match resolve_bearer_token ( server_name, bearer_token_env_var. as_deref ( ) ) {
1514- Ok ( token) => token,
1515- Err ( error) => return Err ( error. into ( ) ) ,
1516- } ;
1517- RmcpClient :: new_streamable_http_client (
1518- server_name,
1519- & url,
1520- resolved_bearer_token,
1521- http_headers,
1522- env_http_headers,
1523- store_mode,
1524- )
1525- . await
1526- . map_err ( StartupOutcomeError :: from)
1527- }
1568+ } => match environment {
1569+ McpServerEnvironment :: Local => {
1570+ // Local streamable HTTP remains the existing reqwest path from
1571+ // the orchestrator process.
1572+ let resolved_bearer_token =
1573+ match resolve_bearer_token ( server_name, bearer_token_env_var. as_deref ( ) ) {
1574+ Ok ( token) => token,
1575+ Err ( error) => return Err ( error. into ( ) ) ,
1576+ } ;
1577+ RmcpClient :: new_streamable_http_client (
1578+ server_name,
1579+ & url,
1580+ resolved_bearer_token,
1581+ http_headers,
1582+ env_http_headers,
1583+ store_mode,
1584+ )
1585+ . await
1586+ . map_err ( StartupOutcomeError :: from)
1587+ }
1588+ McpServerEnvironment :: Remote => Err ( StartupOutcomeError :: from ( anyhow ! (
1589+ // Remote HTTP needs the future low-level executor
1590+ // `network/request` API so reqwest runs on the executor side.
1591+ // Do not fall back to local HTTP here; the config explicitly
1592+ // asked for remote placement.
1593+ "remote streamable HTTP MCP server `{server_name}` is not implemented yet"
1594+ ) ) ) ,
1595+ } ,
15281596 }
15291597}
15301598
0 commit comments