diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 2c3bc1970cf..87667ae1405 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1719,6 +1719,7 @@ mod tests { request_id: RequestId::Integer(9), params: v2::FsGetMetadataParams { path: absolute_path("tmp/example"), + environment_id: None, }, }; assert_eq!( @@ -1726,7 +1727,8 @@ mod tests { "method": "fs/getMetadata", "id": 9, "params": { - "path": absolute_path_string("tmp/example") + "path": absolute_path_string("tmp/example"), + "environmentId": null } }), serde_json::to_value(&request)?, @@ -1741,6 +1743,7 @@ mod tests { params: v2::FsWatchParams { watch_id: "watch-git".to_string(), path: absolute_path("tmp/repo/.git"), + environment_id: None, }, }; assert_eq!( @@ -1749,7 +1752,8 @@ mod tests { "id": 10, "params": { "watchId": "watch-git", - "path": absolute_path_string("tmp/repo/.git") + "path": absolute_path_string("tmp/repo/.git"), + "environmentId": null } }), serde_json::to_value(&request)?, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index d34456f8aa6..a43a86d7ec2 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2333,6 +2333,9 @@ pub struct EnvironmentListResponse { pub struct FsReadFileParams { /// Absolute path to read. pub path: AbsolutePathBuf, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Base64-encoded file contents returned by `fs/readFile`. @@ -2353,6 +2356,9 @@ pub struct FsWriteFileParams { pub path: AbsolutePathBuf, /// File contents encoded as base64. pub data_base64: String, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Successful response for `fs/writeFile`. @@ -2371,6 +2377,9 @@ pub struct FsCreateDirectoryParams { /// Whether parent directories should also be created. Defaults to `true`. #[ts(optional = nullable)] pub recursive: Option, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Successful response for `fs/createDirectory`. @@ -2386,6 +2395,9 @@ pub struct FsCreateDirectoryResponse {} pub struct FsGetMetadataParams { /// Absolute path to inspect. pub path: AbsolutePathBuf, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Metadata returned by `fs/getMetadata`. @@ -2414,6 +2426,9 @@ pub struct FsGetMetadataResponse { pub struct FsReadDirectoryParams { /// Absolute directory path to read. pub path: AbsolutePathBuf, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// A directory entry returned by `fs/readDirectory`. @@ -2451,6 +2466,9 @@ pub struct FsRemoveParams { /// Whether missing paths should be ignored. Defaults to `true`. #[ts(optional = nullable)] pub force: Option, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Successful response for `fs/remove`. @@ -2471,6 +2489,9 @@ pub struct FsCopyParams { /// Required for directory copies; ignored for file copies. #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub recursive: bool, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Successful response for `fs/copy`. @@ -2488,6 +2509,9 @@ pub struct FsWatchParams { pub watch_id: String, /// Absolute file or directory path to watch. pub path: AbsolutePathBuf, + /// Optional environment selection. Omit to use the default environment. + #[ts(optional = nullable)] + pub environment_id: Option, } /// Successful response for `fs/watch`. @@ -6953,6 +6977,7 @@ mod tests { fn fs_read_file_params_round_trip() { let params = FsReadFileParams { path: absolute_path("tmp/example.txt"), + environment_id: Some("dev".to_string()), }; let value = serde_json::to_value(¶ms).expect("serialize fs/readFile params"); @@ -6960,6 +6985,7 @@ mod tests { value, json!({ "path": absolute_path_string("tmp/example.txt"), + "environmentId": "dev", }) ); @@ -6973,6 +6999,7 @@ mod tests { let params = FsCreateDirectoryParams { path: absolute_path("tmp/example"), recursive: None, + environment_id: Some("dev".to_string()), }; let value = serde_json::to_value(¶ms).expect("serialize fs/createDirectory params"); @@ -6981,6 +7008,7 @@ mod tests { json!({ "path": absolute_path_string("tmp/example"), "recursive": null, + "environmentId": "dev", }) ); @@ -6994,6 +7022,7 @@ mod tests { let params = FsWriteFileParams { path: absolute_path("tmp/example.bin"), data_base64: "AAE=".to_string(), + environment_id: Some("dev".to_string()), }; let value = serde_json::to_value(¶ms).expect("serialize fs/writeFile params"); @@ -7002,6 +7031,7 @@ mod tests { json!({ "path": absolute_path_string("tmp/example.bin"), "dataBase64": "AAE=", + "environmentId": "dev", }) ); @@ -7016,6 +7046,7 @@ mod tests { source_path: absolute_path("tmp/source"), destination_path: absolute_path("tmp/destination"), recursive: true, + environment_id: Some("dev".to_string()), }; let value = serde_json::to_value(¶ms).expect("serialize fs/copy params"); @@ -7025,6 +7056,7 @@ mod tests { "sourcePath": absolute_path_string("tmp/source"), "destinationPath": absolute_path_string("tmp/destination"), "recursive": true, + "environmentId": "dev", }) ); diff --git a/codex-rs/app-server/src/fs_api.rs b/codex-rs/app-server/src/fs_api.rs index a2c71871db7..641d701817d 100644 --- a/codex-rs/app-server/src/fs_api.rs +++ b/codex-rs/app-server/src/fs_api.rs @@ -20,7 +20,7 @@ use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_exec_server::CopyOptions; use codex_exec_server::CreateDirectoryOptions; -use codex_exec_server::Environment; +use codex_exec_server::EnvironmentManager; use codex_exec_server::ExecutorFileSystem; use codex_exec_server::RemoveOptions; use std::io; @@ -28,24 +28,36 @@ use std::sync::Arc; #[derive(Clone)] pub(crate) struct FsApi { - file_system: Arc, + environment_manager: Arc, } -impl Default for FsApi { - fn default() -> Self { +impl FsApi { + pub(crate) fn new(environment_manager: Arc) -> Self { Self { - file_system: Environment::default().get_filesystem(), + environment_manager, } } -} -impl FsApi { + async fn file_system( + &self, + environment_id: Option<&str>, + ) -> Result, JSONRPCErrorError> { + let environment = self + .environment_manager + .environment(environment_id) + .await + .map_err(|err| invalid_request(format!("failed to resolve environment: {err}")))?; + let environment = + environment.ok_or_else(|| invalid_request("the selected environment is disabled"))?; + Ok(environment.get_filesystem()) + } + pub(crate) async fn read_file( &self, params: FsReadFileParams, ) -> Result { - let bytes = self - .file_system + let file_system = self.file_system(params.environment_id.as_deref()).await?; + let bytes = file_system .read_file(¶ms.path, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -58,12 +70,13 @@ impl FsApi { &self, params: FsWriteFileParams, ) -> Result { + let file_system = self.file_system(params.environment_id.as_deref()).await?; let bytes = STANDARD.decode(params.data_base64).map_err(|err| { invalid_request(format!( "fs/writeFile requires valid base64 dataBase64: {err}" )) })?; - self.file_system + file_system .write_file(¶ms.path, bytes, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -74,7 +87,8 @@ impl FsApi { &self, params: FsCreateDirectoryParams, ) -> Result { - self.file_system + let file_system = self.file_system(params.environment_id.as_deref()).await?; + file_system .create_directory( ¶ms.path, CreateDirectoryOptions { @@ -91,8 +105,8 @@ impl FsApi { &self, params: FsGetMetadataParams, ) -> Result { - let metadata = self - .file_system + let file_system = self.file_system(params.environment_id.as_deref()).await?; + let metadata = file_system .get_metadata(¶ms.path, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -109,8 +123,8 @@ impl FsApi { &self, params: FsReadDirectoryParams, ) -> Result { - let entries = self - .file_system + let file_system = self.file_system(params.environment_id.as_deref()).await?; + let entries = file_system .read_directory(¶ms.path, /*sandbox*/ None) .await .map_err(map_fs_error)?; @@ -130,7 +144,8 @@ impl FsApi { &self, params: FsRemoveParams, ) -> Result { - self.file_system + let file_system = self.file_system(params.environment_id.as_deref()).await?; + file_system .remove( ¶ms.path, RemoveOptions { @@ -148,7 +163,8 @@ impl FsApi { &self, params: FsCopyParams, ) -> Result { - self.file_system + let file_system = self.file_system(params.environment_id.as_deref()).await?; + file_system .copy( ¶ms.source_path, ¶ms.destination_path, diff --git a/codex-rs/app-server/src/fs_watch.rs b/codex-rs/app-server/src/fs_watch.rs index ff00051472b..ee1ec4cee3f 100644 --- a/codex-rs/app-server/src/fs_watch.rs +++ b/codex-rs/app-server/src/fs_watch.rs @@ -14,6 +14,7 @@ use codex_core::file_watcher::FileWatcherSubscriber; use codex_core::file_watcher::Receiver; use codex_core::file_watcher::WatchPath; use codex_core::file_watcher::WatchRegistration; +use codex_exec_server::EnvironmentManager; use std::collections::HashMap; use std::collections::HashSet; use std::collections::hash_map::Entry; @@ -71,6 +72,7 @@ impl DebouncedReceiver { #[derive(Clone)] pub(crate) struct FsWatchManager { outgoing: Arc, + environment_manager: Arc, file_watcher: Arc, state: Arc>, } @@ -93,7 +95,10 @@ struct WatchKey { } impl FsWatchManager { - pub(crate) fn new(outgoing: Arc) -> Self { + pub(crate) fn new( + outgoing: Arc, + environment_manager: Arc, + ) -> Self { let file_watcher = match FileWatcher::new() { Ok(file_watcher) => Arc::new(file_watcher), Err(err) => { @@ -101,15 +106,17 @@ impl FsWatchManager { Arc::new(FileWatcher::noop()) } }; - Self::new_with_file_watcher(outgoing, file_watcher) + Self::new_with_file_watcher(outgoing, environment_manager, file_watcher) } fn new_with_file_watcher( outgoing: Arc, + environment_manager: Arc, file_watcher: Arc, ) -> Self { Self { outgoing, + environment_manager, file_watcher, state: Arc::new(AsyncMutex::new(FsWatchState::default())), } @@ -120,6 +127,19 @@ impl FsWatchManager { connection_id: ConnectionId, params: FsWatchParams, ) -> Result { + let environment = self + .environment_manager + .environment(params.environment_id.as_deref()) + .await + .map_err(|err| invalid_request(format!("failed to resolve environment: {err}")))?; + let environment = + environment.ok_or_else(|| invalid_request("the selected environment is disabled"))?; + if environment.is_remote() { + return Err(invalid_request( + "fs/watch is unavailable for remote environments", + )); + } + let watch_id = params.watch_id; let watch_key = WatchKey { connection_id, @@ -217,6 +237,7 @@ impl FsWatchManager { #[cfg(test)] mod tests { use super::*; + use codex_exec_server::EnvironmentManager; use codex_utils_absolute_path::AbsolutePathBuf; use pretty_assertions::assert_eq; use tempfile::TempDir; @@ -235,6 +256,7 @@ mod tests { let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER); FsWatchManager::new_with_file_watcher( Arc::new(OutgoingMessageSender::new(tx)), + Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)), Arc::new(FileWatcher::noop()), ) } @@ -254,6 +276,7 @@ mod tests { FsWatchParams { watch_id: watch_id.clone(), path: path.clone(), + environment_id: None, }, ) .await @@ -284,6 +307,7 @@ mod tests { FsWatchParams { watch_id: "watch-head".to_string(), path: absolute_path(head_path), + environment_id: None, }, ) .await @@ -331,6 +355,7 @@ mod tests { FsWatchParams { watch_id: "watch-head".to_string(), path: absolute_path(head_path), + environment_id: None, }, ) .await @@ -342,6 +367,7 @@ mod tests { FsWatchParams { watch_id: "watch-head".to_string(), path: absolute_path(fetch_head_path), + environment_id: None, }, ) .await @@ -368,6 +394,7 @@ mod tests { FsWatchParams { watch_id: "watch-head".to_string(), path: absolute_path(head_path.clone()), + environment_id: None, }, ) .await @@ -378,6 +405,7 @@ mod tests { FsWatchParams { watch_id: "watch-fetch-head".to_string(), path: absolute_path(fetch_head_path), + environment_id: None, }, ) .await @@ -388,6 +416,7 @@ mod tests { FsWatchParams { watch_id: "watch-packed-refs".to_string(), path: absolute_path(packed_refs_path), + environment_id: None, }, ) .await diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index cf706f31602..3a33f4ad8bf 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -323,8 +323,8 @@ impl MessageProcessor { ); let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.to_path_buf()); - let fs_api = FsApi::default(); - let fs_watch_manager = FsWatchManager::new(outgoing.clone()); + let fs_api = FsApi::new(environment_manager.clone()); + let fs_watch_manager = FsWatchManager::new(outgoing.clone(), environment_manager.clone()); Self { outgoing, diff --git a/codex-rs/app-server/tests/suite/v2/fs.rs b/codex-rs/app-server/tests/suite/v2/fs.rs index c7f28f09f55..7d9ac0ac2dc 100644 --- a/codex-rs/app-server/tests/suite/v2/fs.rs +++ b/codex-rs/app-server/tests/suite/v2/fs.rs @@ -4,6 +4,7 @@ use app_test_support::McpProcess; use app_test_support::to_response; use base64::Engine; use base64::engine::general_purpose::STANDARD; +use codex_app_server_protocol::EnvironmentRegisterParams; use codex_app_server_protocol::FsChangedNotification; use codex_app_server_protocol::FsCopyParams; use codex_app_server_protocol::FsGetMetadataResponse; @@ -69,6 +70,7 @@ async fn fs_get_metadata_returns_only_used_fields() -> Result<()> { let request_id = mcp .send_fs_get_metadata_request(codex_app_server_protocol::FsGetMetadataParams { path: absolute_path(file_path.clone()), + environment_id: None, }) .await?; let response = timeout( @@ -126,6 +128,7 @@ async fn fs_get_metadata_reports_symlink() -> Result<()> { let request_id = mcp .send_fs_get_metadata_request(codex_app_server_protocol::FsGetMetadataParams { path: absolute_path(symlink_path), + environment_id: None, }) .await?; let response = timeout( @@ -158,6 +161,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { .send_fs_create_directory_request(codex_app_server_protocol::FsCreateDirectoryParams { path: absolute_path(nested_dir.clone()), recursive: None, + environment_id: None, }) .await?; timeout( @@ -170,6 +174,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { .send_fs_write_file_request(FsWriteFileParams { path: absolute_path(nested_file.clone()), data_base64: STANDARD.encode("hello from app-server"), + environment_id: None, }) .await?; timeout( @@ -182,6 +187,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { .send_fs_write_file_request(FsWriteFileParams { path: absolute_path(source_file.clone()), data_base64: STANDARD.encode("hello from source root"), + environment_id: None, }) .await?; timeout( @@ -193,6 +199,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { let read_request_id = mcp .send_fs_read_file_request(codex_app_server_protocol::FsReadFileParams { path: absolute_path(nested_file.clone()), + environment_id: None, }) .await?; let read_response: FsReadFileResponse = to_response( @@ -214,6 +221,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { source_path: absolute_path(nested_file.clone()), destination_path: absolute_path(copy_file_path.clone()), recursive: false, + environment_id: None, }) .await?; timeout( @@ -231,6 +239,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { source_path: absolute_path(source_dir.clone()), destination_path: absolute_path(copied_dir.clone()), recursive: true, + environment_id: None, }) .await?; timeout( @@ -246,6 +255,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { let read_directory_request_id = mcp .send_fs_read_directory_request(codex_app_server_protocol::FsReadDirectoryParams { path: absolute_path(source_dir.clone()), + environment_id: None, }) .await?; let readdir_response = timeout( @@ -278,6 +288,7 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { path: absolute_path(copied_dir.clone()), recursive: None, force: None, + environment_id: None, }) .await?; timeout( @@ -293,6 +304,85 @@ async fn fs_methods_cover_current_fs_utils_surface() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn fs_methods_support_named_local_environment() -> Result<()> { + let codex_home = TempDir::new()?; + let file_path = codex_home.path().join("named-env.txt"); + + let mut mcp = initialized_mcp(&codex_home).await?; + let register_request_id = mcp + .send_environment_register_request(EnvironmentRegisterParams { + environment_id: "local".to_string(), + exec_server_url: None, + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(register_request_id)), + ) + .await??; + + let write_request_id = mcp + .send_fs_write_file_request(FsWriteFileParams { + path: absolute_path(file_path.clone()), + data_base64: STANDARD.encode("hello from named env"), + environment_id: Some("local".to_string()), + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(write_request_id)), + ) + .await??; + + let read_request_id = mcp + .send_fs_read_file_request(codex_app_server_protocol::FsReadFileParams { + path: absolute_path(file_path.clone()), + environment_id: Some("local".to_string()), + }) + .await?; + let read_response: FsReadFileResponse = to_response( + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_request_id)), + ) + .await??, + )?; + assert_eq!( + read_response, + FsReadFileResponse { + data_base64: STANDARD.encode("hello from named env"), + } + ); + assert_eq!(std::fs::read_to_string(file_path)?, "hello from named env"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn fs_methods_reject_disabled_default_environment() -> Result<()> { + let codex_home = TempDir::new()?; + let file_path = codex_home.path().join("disabled.txt"); + std::fs::write(&file_path, "hello")?; + + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[("CODEX_EXEC_SERVER_URL", Some("none"))], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_fs_read_file_request(codex_app_server_protocol::FsReadFileParams { + path: absolute_path(file_path), + environment_id: None, + }) + .await?; + expect_error_message(&mut mcp, request_id, "the selected environment is disabled").await?; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn fs_write_file_accepts_base64_bytes() -> Result<()> { let codex_home = TempDir::new()?; @@ -304,6 +394,7 @@ async fn fs_write_file_accepts_base64_bytes() -> Result<()> { .send_fs_write_file_request(FsWriteFileParams { path: absolute_path(file_path.clone()), data_base64: STANDARD.encode(bytes), + environment_id: None, }) .await?; timeout( @@ -316,6 +407,7 @@ async fn fs_write_file_accepts_base64_bytes() -> Result<()> { let read_request_id = mcp .send_fs_read_file_request(codex_app_server_protocol::FsReadFileParams { path: absolute_path(file_path), + environment_id: None, }) .await?; let read_response: FsReadFileResponse = to_response( @@ -345,6 +437,7 @@ async fn fs_write_file_rejects_invalid_base64() -> Result<()> { .send_fs_write_file_request(FsWriteFileParams { path: absolute_path(file_path), data_base64: "%%%".to_string(), + environment_id: None, }) .await?; let error = timeout( @@ -500,6 +593,7 @@ async fn fs_copy_rejects_directory_without_recursive() -> Result<()> { source_path: absolute_path(source_dir), destination_path: absolute_path(codex_home.path().join("dest")), recursive: false, + environment_id: None, }) .await?; let error = timeout( @@ -527,6 +621,7 @@ async fn fs_copy_rejects_copying_directory_into_descendant() -> Result<()> { source_path: absolute_path(source_dir.clone()), destination_path: absolute_path(source_dir.join("nested").join("copy")), recursive: true, + environment_id: None, }) .await?; let error = timeout( @@ -558,6 +653,7 @@ async fn fs_copy_preserves_symlinks_in_recursive_copy() -> Result<()> { source_path: absolute_path(source_dir), destination_path: absolute_path(copied_dir.clone()), recursive: true, + environment_id: None, }) .await?; timeout( @@ -598,6 +694,7 @@ async fn fs_copy_ignores_unknown_special_files_in_recursive_copy() -> Result<()> source_path: absolute_path(source_dir), destination_path: absolute_path(copied_dir.clone()), recursive: true, + environment_id: None, }) .await?; timeout( @@ -635,6 +732,7 @@ async fn fs_copy_rejects_standalone_fifo_source() -> Result<()> { source_path: absolute_path(fifo_path), destination_path: absolute_path(codex_home.path().join("copied")), recursive: false, + environment_id: None, }) .await?; expect_error_message( @@ -662,6 +760,7 @@ async fn fs_watch_directory_reports_changed_child_paths_and_unwatch_stops_notifi .send_fs_watch_request(codex_app_server_protocol::FsWatchParams { watch_id: watch_id.clone(), path: absolute_path(git_dir.clone()), + environment_id: None, }) .await?; let watch_response: FsWatchResponse = to_response( @@ -730,6 +829,7 @@ async fn fs_watch_file_reports_atomic_replace_events() -> Result<()> { .send_fs_watch_request(codex_app_server_protocol::FsWatchParams { watch_id: watch_id.clone(), path: absolute_path(head_path.clone()), + environment_id: None, }) .await?; let watch_response: FsWatchResponse = to_response( @@ -769,6 +869,7 @@ async fn fs_watch_allows_missing_file_targets() -> Result<()> { .send_fs_watch_request(codex_app_server_protocol::FsWatchParams { watch_id: watch_id.clone(), path: absolute_path(fetch_head.clone()), + environment_id: None, }) .await?; let watch_response: FsWatchResponse = to_response(