diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index f31aafeac52..be9087b67c2 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -6092,26 +6092,23 @@ impl CodexMessageProcessor { ); let cached_all_connectors = all_connectors.clone(); - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); - let accessible_config = config.clone(); - let accessible_tx = tx.clone(); - tokio::spawn(async move { + let mut accessible_task = tokio::spawn(async move { let result = connectors::list_accessible_connectors_from_mcp_tools_with_options( &accessible_config, force_refetch, ) .await .map_err(|err| format!("failed to load accessible apps: {err}")); - let _ = accessible_tx.send(AppListLoadResult::Accessible(result)); + AppListLoadResult::Accessible(result) }); let all_config = config.clone(); - tokio::spawn(async move { + let mut directory_task = tokio::spawn(async move { let result = connectors::list_all_connectors_with_options(&all_config, force_refetch) .await .map_err(|err| format!("failed to list apps: {err}")); - let _ = tx.send(AppListLoadResult::Directory(result)); + AppListLoadResult::Directory(result) }); let app_list_deadline = tokio::time::Instant::now() + APP_LIST_LOAD_TIMEOUT; @@ -6139,18 +6136,37 @@ impl CodexMessageProcessor { } loop { - let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await { - Ok(Some(result)) => result, - Ok(None) => { + let result = match tokio::time::timeout_at(app_list_deadline, async { + tokio::select! { + result = &mut accessible_task, if !accessible_loaded => result, + result = &mut directory_task, if !all_loaded => result, + } + }) + .await + { + Ok(Ok(result)) => result, + Ok(Err(err)) => { + if !accessible_loaded { + accessible_task.abort(); + } + if !all_loaded { + directory_task.abort(); + } let error = JSONRPCErrorError { code: INTERNAL_ERROR_CODE, - message: "failed to load app lists".to_string(), + message: format!("failed to load app lists: {err}"), data: None, }; outgoing.send_error(request_id, error).await; return; } Err(_) => { + if !accessible_loaded { + accessible_task.abort(); + } + if !all_loaded { + directory_task.abort(); + } let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs(); let error = JSONRPCErrorError { code: INTERNAL_ERROR_CODE, @@ -6170,6 +6186,9 @@ impl CodexMessageProcessor { accessible_loaded = true; } AppListLoadResult::Accessible(Err(err)) => { + if !all_loaded { + directory_task.abort(); + } let error = JSONRPCErrorError { code: INTERNAL_ERROR_CODE, message: err, @@ -6183,6 +6202,9 @@ impl CodexMessageProcessor { all_loaded = true; } AppListLoadResult::Directory(Err(err)) => { + if !accessible_loaded { + accessible_task.abort(); + } let error = JSONRPCErrorError { code: INTERNAL_ERROR_CODE, message: err, diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index 57a27961af6..f6c2b6f5806 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -61,6 +61,13 @@ const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); #[tokio::test] async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { let codex_home = TempDir::new()?; + std::fs::write( + codex_home.path().join("config.toml"), + r#" +[features] +connectors = false +"#, + )?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -876,8 +883,13 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result plugin_display_names: Vec::new(), }]; let tools = vec![connector_tool("beta", "Beta App")?]; - let (server_url, server_handle) = - start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?; + let (server_url, server_handle, server_control) = start_apps_server_with_delays_and_control( + connectors, + tools, + Duration::ZERO, + Duration::from_millis(150), + ) + .await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; @@ -922,6 +934,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result .chatgpt_account_id("account-123"), AuthCredentialsStoreMode::File, )?; + server_control.set_tools(Vec::new()); let refetch_request = mcp .send_apps_list_request(AppsListParams { @@ -938,6 +951,8 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result .await??; assert!(refetch_error.error.message.contains("failed to")); + tokio::time::sleep(Duration::from_millis(250)).await; + let cached_request = mcp .send_apps_list_request(AppsListParams { limit: None,