diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d177e1df1c6..4a826ccfdb7 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2367,6 +2367,7 @@ dependencies = [ "async-channel", "codex-async-utils", "codex-config", + "codex-exec-server", "codex-login", "codex-otel", "codex-plugin", diff --git a/codex-rs/codex-mcp/Cargo.toml b/codex-rs/codex-mcp/Cargo.toml index adc38d4093a..0aec1f3aaf6 100644 --- a/codex-rs/codex-mcp/Cargo.toml +++ b/codex-rs/codex-mcp/Cargo.toml @@ -16,6 +16,7 @@ anyhow = { workspace = true } async-channel = { workspace = true } codex-async-utils = { workspace = true } codex-config = { workspace = true } +codex-exec-server = { workspace = true } codex-login = { workspace = true } codex-otel = { workspace = true } codex-plugin = { workspace = true } diff --git a/codex-rs/codex-mcp/src/mcp/mod.rs b/codex-rs/codex-mcp/src/mcp/mod.rs index 9b21bd5a57f..4c21124f48b 100644 --- a/codex-rs/codex-mcp/src/mcp/mod.rs +++ b/codex-rs/codex-mcp/src/mcp/mod.rs @@ -355,6 +355,8 @@ pub async fn collect_mcp_snapshot_with_detail( submit_id, tx_event, SandboxPolicy::new_read_only_policy(), + None, + config.codex_home.clone(), config.codex_home.clone(), codex_apps_tools_cache_key(auth), tool_plugin_provenance, @@ -421,6 +423,8 @@ pub async fn collect_mcp_server_status_snapshot_with_detail( submit_id, tx_event, SandboxPolicy::new_read_only_policy(), + None, + config.codex_home.clone(), config.codex_home.clone(), codex_apps_tools_cache_key(auth), tool_plugin_provenance, diff --git a/codex-rs/codex-mcp/src/mcp_connection_manager.rs b/codex-rs/codex-mcp/src/mcp_connection_manager.rs index 093f8dac9a2..5b0f481880c 100644 --- a/codex-rs/codex-mcp/src/mcp_connection_manager.rs +++ b/codex-rs/codex-mcp/src/mcp_connection_manager.rs @@ -35,7 +35,9 @@ use async_channel::Sender; use codex_async_utils::CancelErr; use codex_async_utils::OrCancelExt; use codex_config::Constrained; +use codex_config::McpServerEnvironment; use codex_config::types::OAuthCredentialsStoreMode; +use codex_exec_server::Environment; use codex_protocol::ToolName; use codex_protocol::approvals::ElicitationRequest; use codex_protocol::approvals::ElicitationRequestEvent; @@ -50,6 +52,7 @@ use codex_protocol::protocol::McpStartupStatus; use codex_protocol::protocol::McpStartupUpdateEvent; use codex_protocol::protocol::SandboxPolicy; use codex_rmcp_client::ElicitationResponse; +use codex_rmcp_client::ExecutorStdioServerLauncher; use codex_rmcp_client::LocalStdioServerLauncher; use codex_rmcp_client::RmcpClient; use codex_rmcp_client::SendElicitation; @@ -493,6 +496,8 @@ impl AsyncManagedClient { elicitation_requests: ElicitationRequestManager, codex_apps_tools_cache_context: Option, tool_plugin_provenance: Arc, + environment: Option>, + remote_stdio_cwd: PathBuf, ) -> Self { let tool_filter = ToolFilter::from_config(&config); let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( @@ -509,8 +514,16 @@ impl AsyncManagedClient { return Err(error.into()); } - let client = - Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?); + let client = Arc::new( + make_rmcp_client( + &server_name, + config.clone(), + store_mode, + environment, + remote_stdio_cwd, + ) + .await?, + ); match start_server_task( server_name, client, @@ -710,6 +723,8 @@ impl McpConnectionManager { submit_id: String, tx_event: Sender, initial_sandbox_policy: SandboxPolicy, + environment: Option>, + remote_stdio_cwd: PathBuf, codex_home: PathBuf, codex_apps_tools_cache_key: CodexAppsToolsCacheKey, tool_plugin_provenance: ToolPluginProvenance, @@ -754,6 +769,8 @@ impl McpConnectionManager { elicitation_requests.clone(), codex_apps_tools_cache_context, Arc::clone(&tool_plugin_provenance), + environment.clone(), + remote_stdio_cwd.clone(), ); clients.insert(server_name.clone(), async_managed_client.clone()); let tx_event = tx_event.clone(); @@ -1483,9 +1500,17 @@ struct StartServerTaskParams { async fn make_rmcp_client( server_name: &str, - transport: McpServerTransportConfig, + config: McpServerConfig, store_mode: OAuthCredentialsStoreMode, + exec_environment: Option>, + remote_stdio_cwd: PathBuf, ) -> Result { + let McpServerConfig { + transport, + environment, + .. + } = config; + match transport { McpServerTransportConfig::Stdio { command, @@ -1501,7 +1526,26 @@ async fn make_rmcp_client( .map(|(key, value)| (key.into(), value.into())) .collect::>() }); - let launcher = Arc::new(LocalStdioServerLauncher) as Arc; + let launcher = match environment { + McpServerEnvironment::Local => { + Arc::new(LocalStdioServerLauncher) as Arc + } + McpServerEnvironment::Remote => { + let exec_environment = exec_environment.ok_or_else(|| { + StartupOutcomeError::from(anyhow!( + "remote MCP server `{server_name}` requires an executor environment" + )) + })?; + Arc::new(ExecutorStdioServerLauncher::new( + exec_environment.get_exec_backend(), + remote_stdio_cwd, + )) + } + }; + + // `RmcpClient` always sees a launched MCP stdio server. The + // launcher hides whether that means a local child process or an + // executor process whose stdin/stdout bytes cross the process API. RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher) .await .map_err(|err| StartupOutcomeError::from(anyhow!(err))) @@ -1511,23 +1555,34 @@ async fn make_rmcp_client( http_headers, env_http_headers, bearer_token_env_var, - } => { - let resolved_bearer_token = - match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) { - Ok(token) => token, - Err(error) => return Err(error.into()), - }; - RmcpClient::new_streamable_http_client( - server_name, - &url, - resolved_bearer_token, - http_headers, - env_http_headers, - store_mode, - ) - .await - .map_err(StartupOutcomeError::from) - } + } => match environment { + McpServerEnvironment::Local => { + // Local streamable HTTP remains the existing reqwest path from + // the orchestrator process. + let resolved_bearer_token = + match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) { + Ok(token) => token, + Err(error) => return Err(error.into()), + }; + RmcpClient::new_streamable_http_client( + server_name, + &url, + resolved_bearer_token, + http_headers, + env_http_headers, + store_mode, + ) + .await + .map_err(StartupOutcomeError::from) + } + McpServerEnvironment::Remote => Err(StartupOutcomeError::from(anyhow!( + // Remote HTTP needs the future low-level executor + // `network/request` API so reqwest runs on the executor side. + // Do not fall back to local HTTP here; the config explicitly + // asked for remote placement. + "remote streamable HTTP MCP server `{server_name}` is not implemented yet" + ))), + }, } } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 66a0dc527ef..7744ae6a860 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2153,7 +2153,7 @@ impl Session { code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), - environment, + environment: environment.clone(), }; services .model_client @@ -2247,6 +2247,8 @@ impl Session { INITIAL_SUBMIT_ID.to_owned(), tx_event.clone(), session_configuration.sandbox_policy.get().clone(), + environment.clone(), + session_configuration.cwd.to_path_buf(), config.codex_home.to_path_buf(), codex_apps_tools_cache_key(auth), tool_plugin_provenance, @@ -4583,6 +4585,8 @@ impl Session { turn_context.sub_id.clone(), self.get_tx_event(), turn_context.sandbox_policy.get().clone(), + turn_context.environment.clone(), + turn_context.cwd.to_path_buf(), config.codex_home.to_path_buf(), codex_apps_tools_cache_key(auth.as_ref()), tool_plugin_provenance, diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 77603adcf18..c16ad9f1aca 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -233,6 +233,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status( INITIAL_SUBMIT_ID.to_owned(), tx_event, SandboxPolicy::new_read_only_policy(), + None, + config.codex_home.to_path_buf(), config.codex_home.to_path_buf(), codex_apps_tools_cache_key(auth.as_ref()), ToolPluginProvenance::default(),