From 021c8b0a36b4c91ab71e0c8339409641a0dcd250 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Wed, 15 Apr 2026 15:06:47 -0700 Subject: [PATCH] Thread environment selection through core spawn Co-authored-by: Codex --- codex-rs/core/src/codex.rs | 4 +- codex-rs/core/src/codex_delegate.rs | 1 + codex-rs/core/src/codex_tests_guardian.rs | 1 + codex-rs/core/src/thread_manager.rs | 18 +++- codex-rs/core/tests/common/test_codex.rs | 56 +++++++++- codex-rs/core/tests/suite/code_mode.rs | 1 + codex-rs/core/tests/suite/remote_env.rs | 125 ++++++++++++++++++++++ 7 files changed, 200 insertions(+), 6 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index cc314a66e9b..1f188c74555 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -427,6 +427,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) auth_manager: Arc, pub(crate) models_manager: Arc, pub(crate) environment_manager: Arc, + pub(crate) environment_id: Option, pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, @@ -480,6 +481,7 @@ impl Codex { auth_manager, models_manager, environment_manager, + environment_id, skills_manager, plugins_manager, mcp_manager, @@ -500,7 +502,7 @@ impl Codex { let (tx_event, rx_event) = async_channel::unbounded(); let environment = environment_manager - .current() + .environment(environment_id.as_deref()) .await .map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?; let fs = environment diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 9cd58e044fd..eda1c13d183 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -82,6 +82,7 @@ pub(crate) async fn run_codex_thread_interactive( environment_manager: Arc::new(EnvironmentManager::from_environment( parent_ctx.environment.as_deref(), )), + environment_id: None, skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), diff --git a/codex-rs/core/src/codex_tests_guardian.rs b/codex-rs/core/src/codex_tests_guardian.rs index cad67dcc8fd..2de4ed613fd 100644 --- a/codex-rs/core/src/codex_tests_guardian.rs +++ b/codex-rs/core/src/codex_tests_guardian.rs @@ -435,6 +435,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { auth_manager, models_manager, environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), + environment_id: None, skills_manager, plugins_manager, mcp_manager, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index b8ad75b49e1..ff1ce3cb6c6 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -471,6 +471,7 @@ impl ThreadManager { config, Vec::new(), /*persist_extended_history*/ false, + /*environment_id*/ None, )) .await } @@ -480,6 +481,7 @@ impl ThreadManager { config: Config, dynamic_tools: Vec, persist_extended_history: bool, + environment_id: Option, ) -> CodexResult { Box::pin(self.start_thread_with_tools_and_service_name( config, @@ -488,6 +490,7 @@ impl ThreadManager { persist_extended_history, /*metrics_service_name*/ None, /*parent_trace*/ None, + environment_id, )) .await } @@ -500,6 +503,7 @@ impl ThreadManager { persist_extended_history: bool, metrics_service_name: Option, parent_trace: Option, + environment_id: Option, ) -> CodexResult { Box::pin(self.state.spawn_thread( config, @@ -511,6 +515,7 @@ impl ThreadManager { metrics_service_name, parent_trace, /*user_shell_override*/ None, + environment_id, )) .await } @@ -551,6 +556,7 @@ impl ThreadManager { /*metrics_service_name*/ None, parent_trace, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -570,6 +576,7 @@ impl ThreadManager { /*metrics_service_name*/ None, /*parent_trace*/ None, /*user_shell_override*/ Some(user_shell_override), + /*environment_id*/ None, )) .await } @@ -592,6 +599,7 @@ impl ThreadManager { /*metrics_service_name*/ None, /*parent_trace*/ None, /*user_shell_override*/ Some(user_shell_override), + /*environment_id*/ None, )) .await } @@ -700,6 +708,7 @@ impl ThreadManager { /*metrics_service_name*/ None, parent_trace, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -801,6 +810,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -828,6 +838,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -856,6 +867,7 @@ impl ThreadManagerState { inherited_exec_policy, /*parent_trace*/ None, /*user_shell_override*/ None, + /*environment_id*/ None, )) .await } @@ -873,6 +885,7 @@ impl ThreadManagerState { metrics_service_name: Option, parent_trace: Option, user_shell_override: Option, + environment_id: Option, ) -> CodexResult { Box::pin(self.spawn_thread_with_source( config, @@ -887,6 +900,7 @@ impl ThreadManagerState { /*inherited_exec_policy*/ None, parent_trace, user_shell_override, + environment_id, )) .await } @@ -906,10 +920,11 @@ impl ThreadManagerState { inherited_exec_policy: Option>, parent_trace: Option, user_shell_override: Option, + environment_id: Option, ) -> CodexResult { let environment = self .environment_manager - .current() + .environment(environment_id.as_deref()) .await .map_err(|err| CodexErr::Fatal(format!("failed to create environment: {err}")))?; let watch_registration = match environment.as_ref() { @@ -932,6 +947,7 @@ impl ThreadManagerState { auth_manager, models_manager: Arc::clone(&self.models_manager), environment_manager: Arc::clone(&self.environment_manager), + environment_id, skills_manager: Arc::clone(&self.skills_manager), plugins_manager: Arc::clone(&self.plugins_manager), mcp_manager: Arc::clone(&self.mcp_manager), diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index 92176c338f3..e1136e71668 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -202,6 +202,8 @@ pub struct TestCodexBuilder { workspace_setups: Vec>, home: Option>, user_shell_override: Option, + environment_manager_override: Option>, + thread_environment_id: Option, } impl TestCodexBuilder { @@ -253,6 +255,19 @@ impl TestCodexBuilder { self } + pub fn with_environment_manager( + mut self, + environment_manager: Arc, + ) -> Self { + self.environment_manager_override = Some(environment_manager); + self + } + + pub fn with_thread_environment_id(mut self, environment_id: impl Into) -> Self { + self.thread_environment_id = Some(environment_id.into()); + self + } + pub fn with_windows_cmd_shell(self) -> Self { if cfg!(windows) { self.with_user_shell(get_shell_by_model_provided_path(&PathBuf::from("cmd.exe"))) @@ -348,9 +363,17 @@ impl TestCodexBuilder { let (config, fallback_cwd) = self .prepare_config(base_url, &home, test_env.cwd().clone()) .await?; - let environment_manager = Arc::new(codex_exec_server::EnvironmentManager::new( - test_env.exec_server_url().map(str::to_owned), - )); + let environment_manager = self + .environment_manager_override + .clone() + .unwrap_or_else(|| { + Arc::new(codex_exec_server::EnvironmentManager::new( + test_env.exec_server_url().map(str::to_owned), + )) + }); + let selected_environment = environment_manager + .environment(self.thread_environment_id.as_deref()) + .await?; let file_system = test_env.environment().get_filesystem(); let mut workspace_setups = vec![]; swap(&mut self.workspace_setups, &mut workspace_setups); @@ -365,6 +388,7 @@ impl TestCodexBuilder { resume_from, test_env, environment_manager, + selected_environment, )) .await } @@ -377,6 +401,7 @@ impl TestCodexBuilder { resume_from: Option, test_env: TestEnv, environment_manager: Arc, + selected_environment: Option>, ) -> anyhow::Result { let auth = self.auth.clone(); let thread_manager = if config.model_catalog.is_some() { @@ -398,8 +423,15 @@ impl TestCodexBuilder { }; let thread_manager = Arc::new(thread_manager); let user_shell_override = self.user_shell_override.clone(); + let thread_environment_id = self.thread_environment_id.clone(); let new_conversation = match (resume_from, user_shell_override) { + (Some(_), _) if thread_environment_id.is_some() => { + anyhow::bail!("test harness does not support resuming with thread_environment_id") + } + (_, Some(_)) if thread_environment_id.is_some() => anyhow::bail!( + "test harness does not support user_shell_override with thread_environment_id" + ), (Some(path), Some(user_shell_override)) => { let auth_manager = codex_core::test_support::auth_manager_from_auth(auth); Box::pin( @@ -433,7 +465,15 @@ impl TestCodexBuilder { ) .await? } - (None, None) => Box::pin(thread_manager.start_thread(config.clone())).await?, + (None, None) => { + Box::pin(thread_manager.start_thread_with_tools( + config.clone(), + Vec::new(), + /*persist_extended_history*/ false, + thread_environment_id, + )) + .await? + } }; Ok(TestCodex { @@ -443,6 +483,7 @@ impl TestCodexBuilder { codex: new_conversation.thread, session_configured: new_conversation.session_configured, thread_manager, + selected_environment, _test_env: test_env, }) } @@ -533,6 +574,7 @@ pub struct TestCodex { pub session_configured: SessionConfiguredEvent, pub config: Config, pub thread_manager: Arc, + selected_environment: Option>, _test_env: TestEnv, } @@ -553,6 +595,10 @@ impl TestCodex { &self._test_env } + pub fn selected_environment(&self) -> Option<&codex_exec_server::Environment> { + self.selected_environment.as_deref() + } + pub fn fs(&self) -> Arc { self._test_env.environment().get_filesystem() } @@ -879,6 +925,8 @@ pub fn test_codex() -> TestCodexBuilder { workspace_setups: vec![], home: None, user_shell_override: None, + environment_manager_override: None, + thread_environment_id: None, } } diff --git a/codex-rs/core/tests/suite/code_mode.rs b/codex-rs/core/tests/suite/code_mode.rs index f010c9e5876..e1994a563ae 100644 --- a/codex-rs/core/tests/suite/code_mode.rs +++ b/codex-rs/core/tests/suite/code_mode.rs @@ -2364,6 +2364,7 @@ async fn code_mode_can_call_hidden_dynamic_tools() -> Result<()> { defer_loading: true, }], /*persist_extended_history*/ false, + /*environment_id*/ None, ) .await?; let mut test = base_test; diff --git a/codex-rs/core/tests/suite/remote_env.rs b/codex-rs/core/tests/suite/remote_env.rs index 36c9b35c30f..33df2d1c978 100644 --- a/codex-rs/core/tests/suite/remote_env.rs +++ b/codex-rs/core/tests/suite/remote_env.rs @@ -2,6 +2,8 @@ use anyhow::Context; use anyhow::Result; use codex_exec_server::CopyOptions; use codex_exec_server::CreateDirectoryOptions; +use codex_exec_server::EnvironmentConfig; +use codex_exec_server::EnvironmentManager; use codex_exec_server::FileSystemSandboxContext; use codex_exec_server::RemoveOptions; use codex_protocol::protocol::ReadOnlyAccess; @@ -9,14 +11,137 @@ use codex_protocol::protocol::SandboxPolicy; use codex_utils_absolute_path::AbsolutePathBuf; use core_test_support::PathBufExt; use core_test_support::get_remote_test_env; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::mount_sse_once; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; +use core_test_support::test_codex::test_codex; use core_test_support::test_codex::test_env; use pretty_assertions::assert_eq; use std::path::PathBuf; use std::process::Command; +use std::sync::Arc; use std::time::SystemTime; use std::time::UNIX_EPOCH; +const REMOTE_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_TEST_REMOTE_EXEC_SERVER_URL"; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn thread_can_start_and_complete_turn_with_disabled_default_environment() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let _mock = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-disabled"), + ev_assistant_message("msg-disabled", "done"), + ev_completed("resp-disabled"), + ]), + ) + .await; + + let environment_manager = Arc::new(EnvironmentManager::new(Some("none".to_string()))); + let mut builder = test_codex().with_environment_manager(environment_manager); + let test = builder.build(&server).await?; + assert!(test.selected_environment().is_none()); + + test.submit_turn("hello from disabled env").await?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn thread_can_start_and_complete_turn_with_named_local_environment() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let _mock = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-local"), + ev_assistant_message("msg-local", "done"), + ev_completed("resp-local"), + ]), + ) + .await; + + let environment_manager = Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)); + environment_manager + .register_environment( + "local".to_string(), + EnvironmentConfig { + exec_server_url: None, + }, + ) + .await?; + + let mut builder = test_codex() + .with_environment_manager(environment_manager) + .with_thread_environment_id("local"); + let test = builder.build(&server).await?; + let selected_environment = test + .selected_environment() + .context("named local environment should resolve")?; + assert!(!selected_environment.is_remote()); + + test.submit_turn("hello from named local env").await?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn thread_can_start_and_complete_turn_with_named_remote_environment() -> Result<()> { + skip_if_no_network!(Ok(())); + let Some(_remote_env) = get_remote_test_env() else { + return Ok(()); + }; + + let remote_exec_server_url = + std::env::var(REMOTE_EXEC_SERVER_URL_ENV_VAR).with_context(|| { + format!( + "{REMOTE_EXEC_SERVER_URL_ENV_VAR} must be set for named remote environment tests" + ) + })?; + + let server = start_mock_server().await; + let _mock = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-remote"), + ev_assistant_message("msg-remote", "done"), + ev_completed("resp-remote"), + ]), + ) + .await; + + let environment_manager = Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)); + environment_manager + .register_environment( + "remote".to_string(), + EnvironmentConfig { + exec_server_url: Some(remote_exec_server_url), + }, + ) + .await?; + + let mut builder = test_codex() + .with_environment_manager(environment_manager) + .with_thread_environment_id("remote"); + let test = builder.build_remote_aware(&server).await?; + let selected_environment = test + .selected_environment() + .context("named remote environment should resolve")?; + assert!(selected_environment.is_remote()); + + test.submit_turn("hello from named remote env").await?; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_test_env_can_connect_and_use_filesystem() -> Result<()> { let Some(_remote_env) = get_remote_test_env() else {