From 8e4f8d6bff01113c74c41e2a947d03757ad2d3ea Mon Sep 17 00:00:00 2001 From: adrian Date: Wed, 15 Apr 2026 12:02:06 -0700 Subject: [PATCH 1/4] Persist and prewarm agent tasks per thread --- .../src/protocol/thread_history.rs | 4 +- .../tests/suite/v2/client_metadata.rs | 2 +- .../tests/suite/v2/collaboration_mode_list.rs | 2 +- .../suite/v2/connection_handling_websocket.rs | 2 +- .../app-server/tests/suite/v2/initialize.rs | 2 +- .../tests/suite/v2/realtime_conversation.rs | 4 +- codex-rs/core/src/agent/control.rs | 1 + codex-rs/core/src/agent/control_tests.rs | 2 +- codex-rs/core/src/agent_identity.rs | 13 +- .../src/agent_identity/task_registration.rs | 42 +-- codex-rs/core/src/codex.rs | 94 +---- .../core/src/codex/agent_task_lifecycle.rs | 157 ++++++++ .../core/src/codex/rollout_reconstruction.rs | 5 +- codex-rs/core/src/codex_tests.rs | 350 +++++++++++++++++- codex-rs/core/src/state/session.rs | 8 +- codex-rs/core/src/state/session_tests.rs | 14 +- codex-rs/protocol/src/protocol.rs | 16 + codex-rs/rollout/src/list.rs | 4 + codex-rs/rollout/src/metadata.rs | 6 +- codex-rs/rollout/src/policy.rs | 7 +- codex-rs/rollout/src/recorder.rs | 4 + codex-rs/state/src/extract.rs | 8 +- codex-rs/state/src/runtime/threads.rs | 6 +- 23 files changed, 600 insertions(+), 153 deletions(-) create mode 100644 codex-rs/core/src/codex/agent_task_lifecycle.rs diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 9e94515dd9b..f4959faff87 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -218,7 +218,9 @@ impl ThreadHistoryBuilder { RolloutItem::EventMsg(event) => self.handle_event(event), RolloutItem::Compacted(payload) => self.handle_compacted(payload), RolloutItem::ResponseItem(item) => self.handle_response_item(item), - RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {} + RolloutItem::TurnContext(_) + | RolloutItem::SessionMeta(_) + | RolloutItem::SessionState(_) => {} } } diff --git a/codex-rs/app-server/tests/suite/v2/client_metadata.rs b/codex-rs/app-server/tests/suite/v2/client_metadata.rs index c85febd7d46..e343e08470b 100644 --- a/codex-rs/app-server/tests/suite/v2/client_metadata.rs +++ b/codex-rs/app-server/tests/suite/v2/client_metadata.rs @@ -18,7 +18,7 @@ use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20); #[tokio::test] async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result<()> { diff --git a/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs b/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs index 7c36827e6dd..0dd4e8174a8 100644 --- a/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs +++ b/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs @@ -21,7 +21,7 @@ use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(20); /// Confirms the server returns the default collaboration mode presets in a stable order. #[tokio::test] diff --git a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs index 05d26d55e36..0b4c476ca7c 100644 --- a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs +++ b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs @@ -47,7 +47,7 @@ use tokio_tungstenite::tungstenite::http::HeaderValue; use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION; use tokio_tungstenite::tungstenite::http::header::ORIGIN; -pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10); +pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(20); pub(super) type WsClient = WebSocketStream>; type HmacSha256 = Hmac; diff --git a/codex-rs/app-server/tests/suite/v2/initialize.rs b/codex-rs/app-server/tests/suite/v2/initialize.rs index 165160468f7..cfb08a17187 100644 --- a/codex-rs/app-server/tests/suite/v2/initialize.rs +++ b/codex-rs/app-server/tests/suite/v2/initialize.rs @@ -24,7 +24,7 @@ use std::time::Duration; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20); #[tokio::test] async fn initialize_uses_client_info_name_as_originator() -> Result<()> { diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 9870d410f17..aab588ad411 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -71,7 +71,7 @@ use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::path_regex; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(20); const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; const V2_STEERING_ACKNOWLEDGEMENT: &str = "This was sent to steer the previous background agent task."; @@ -1710,7 +1710,7 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<( create_shell_command_sse_response( realtime_tool_ok_command(), /*workdir*/ None, - Some(5000), + Some(10_000), "shell_call", )?, create_final_assistant_message_sse_response("shell tool finished")?, diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index 509a578f5c9..d7a6403c937 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -116,6 +116,7 @@ fn keep_forked_rollout_item(item: &RolloutItem) -> bool { | ResponseItem::Compaction { .. } | ResponseItem::Other, ) => false, + RolloutItem::SessionState(_) => false, RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) diff --git a/codex-rs/core/src/agent/control_tests.rs b/codex-rs/core/src/agent/control_tests.rs index 6fc74b30e86..8645ec6d93a 100644 --- a/codex-rs/core/src/agent/control_tests.rs +++ b/codex-rs/core/src/agent/control_tests.rs @@ -186,7 +186,7 @@ async fn wait_for_subagent_notification(parent_thread: &Arc) -> boo sleep(Duration::from_millis(25)).await; } }; - timeout(Duration::from_secs(2), wait).await.is_ok() + timeout(Duration::from_secs(5), wait).await.is_ok() } async fn persist_thread_for_tree_resume(thread: &Arc, message: &str) { diff --git a/codex-rs/core/src/agent_identity.rs b/codex-rs/core/src/agent_identity.rs index c6e4d9f209d..1fb4509bf1f 100644 --- a/codex-rs/core/src/agent_identity.rs +++ b/codex-rs/core/src/agent_identity.rs @@ -153,14 +153,16 @@ impl AgentIdentityManager { Ok(stored_identity) } - pub(crate) async fn task_matches_current_binding(&self, task: &RegisteredAgentTask) -> bool { + pub(crate) async fn task_matches_current_identity(&self, task: &RegisteredAgentTask) -> bool { if !self.feature_enabled { return false; } - self.current_auth_binding() + self.current_stored_identity() .await - .is_some_and(|(_, binding)| task.matches_binding(&binding)) + .is_some_and(|stored_identity| { + stored_identity.agent_runtime_id == task.agent_runtime_id + }) } async fn current_auth_binding(&self) -> Option<(CodexAuth, AgentIdentityBinding)> { @@ -177,6 +179,11 @@ impl AgentIdentityManager { binding.map(|binding| (auth, binding)) } + async fn current_stored_identity(&self) -> Option { + let (auth, binding) = self.current_auth_binding().await?; + self.load_stored_identity(&auth, &binding).ok().flatten() + } + async fn register_agent_identity( &self, binding: &AgentIdentityBinding, diff --git a/codex-rs/core/src/agent_identity/task_registration.rs b/codex-rs/core/src/agent_identity/task_registration.rs index cada2479f5f..2d06bd9020b 100644 --- a/codex-rs/core/src/agent_identity/task_registration.rs +++ b/codex-rs/core/src/agent_identity/task_registration.rs @@ -2,6 +2,7 @@ use std::time::Duration; use anyhow::Context; use anyhow::Result; +use codex_protocol::protocol::SessionAgentTask; use crypto_box::SecretKey as Curve25519SecretKey; use ed25519_dalek::Signer as _; use serde::Deserialize; @@ -16,9 +17,6 @@ const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15); #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct RegisteredAgentTask { - pub(crate) binding_id: String, - pub(crate) chatgpt_account_id: String, - pub(crate) chatgpt_user_id: Option, pub(crate) agent_runtime_id: String, pub(crate) task_id: String, pub(crate) registered_at: String, @@ -82,9 +80,6 @@ impl AgentIdentityManager { .await .with_context(|| format!("failed to parse agent task response from {url}"))?; let registered_task = RegisteredAgentTask { - binding_id: stored_identity.binding_id.clone(), - chatgpt_account_id: stored_identity.chatgpt_account_id.clone(), - chatgpt_user_id: stored_identity.chatgpt_user_id.clone(), agent_runtime_id: stored_identity.agent_runtime_id.clone(), task_id: decrypt_task_id_response( &stored_identity, @@ -107,18 +102,20 @@ impl AgentIdentityManager { } impl RegisteredAgentTask { - pub(super) fn matches_binding(&self, binding: &AgentIdentityBinding) -> bool { - binding.matches_parts( - &self.binding_id, - &self.chatgpt_account_id, - self.chatgpt_user_id.as_deref(), - ) + pub(crate) fn to_session_agent_task(&self) -> SessionAgentTask { + SessionAgentTask { + agent_runtime_id: self.agent_runtime_id.clone(), + task_id: self.task_id.clone(), + registered_at: self.registered_at.clone(), + } } - pub(crate) fn has_same_binding(&self, other: &Self) -> bool { - self.binding_id == other.binding_id - && self.chatgpt_account_id == other.chatgpt_account_id - && self.chatgpt_user_id == other.chatgpt_user_id + pub(crate) fn from_session_agent_task(task: SessionAgentTask) -> Self { + Self { + agent_runtime_id: task.agent_runtime_id, + task_id: task.task_id, + registered_at: task.registered_at, + } } } @@ -242,9 +239,6 @@ mod tests { assert_eq!( task, RegisteredAgentTask { - binding_id: "chatgpt-account-account-123".to_string(), - chatgpt_account_id: "account-123".to_string(), - chatgpt_user_id: Some("user-123".to_string()), agent_runtime_id: "agent-123".to_string(), task_id: "task_123".to_string(), registered_at: task.registered_at.clone(), @@ -331,9 +325,6 @@ mod tests { assert_eq!( task, RegisteredAgentTask { - binding_id: "chatgpt-account-account-123".to_string(), - chatgpt_account_id: "account-123".to_string(), - chatgpt_user_id: Some("user-123".to_string()), agent_runtime_id: "agent-123".to_string(), task_id: "task_123".to_string(), registered_at: task.registered_at.clone(), @@ -342,7 +333,7 @@ mod tests { } #[tokio::test] - async fn task_matches_current_binding_rejects_stale_auth_binding() { + async fn task_matches_current_identity_rejects_stale_registered_identity() { let auth_manager = AuthManager::from_auth_for_testing(make_chatgpt_auth("account-456", Some("user-456"))); let manager = AgentIdentityManager::new_for_tests( @@ -352,15 +343,12 @@ mod tests { SessionSource::Cli, ); let task = RegisteredAgentTask { - binding_id: "chatgpt-account-account-123".to_string(), - chatgpt_account_id: "account-123".to_string(), - chatgpt_user_id: Some("user-123".to_string()), agent_runtime_id: "agent-123".to_string(), task_id: "task_123".to_string(), registered_at: "2026-03-23T12:00:00Z".to_string(), }; - assert!(!manager.task_matches_current_binding(&task).await); + assert!(!manager.task_matches_current_identity(&task).await); } async fn mount_human_biscuit( diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 431f8a4e29f..613f710ccc7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -14,7 +14,6 @@ use crate::agent::MailboxReceiver; use crate::agent::agent_status_from_event; use crate::agent::status::is_final; use crate::agent_identity::AgentIdentityManager; -use crate::agent_identity::RegisteredAgentTask; use crate::apps::render_apps_section; use crate::commit_attribution::commit_message_trailer_instruction; use crate::compact; @@ -203,6 +202,7 @@ use codex_protocol::error::Result as CodexResult; #[cfg(test)] use codex_protocol::exec_output::StreamOutput; +mod agent_task_lifecycle; mod rollout_reconstruction; #[cfg(test)] mod rollout_reconstruction_tests; @@ -862,6 +862,7 @@ pub(crate) struct Session { pub(crate) services: SessionServices, js_repl: Arc, next_internal_sub_id: AtomicU64, + agent_task_registration_lock: Mutex<()>, } #[derive(Clone, Debug)] @@ -1530,7 +1531,10 @@ impl Session { .ensure_registered_identity() .await { - Ok(Some(_)) => return, + Ok(Some(_)) => { + sess.maybe_prewarm_agent_task_registration().await; + return; + } Ok(None) => { drop(sess); if auth_state_rx.changed().await.is_err() { @@ -1561,90 +1565,6 @@ impl Session { .await; } - async fn cached_agent_task_for_current_binding(&self) -> Option { - let agent_task = { - let state = self.state.lock().await; - state.agent_task() - }?; - - if self - .services - .agent_identity_manager - .task_matches_current_binding(&agent_task) - .await - { - debug!( - agent_runtime_id = %agent_task.agent_runtime_id, - task_id = %agent_task.task_id, - "reusing cached agent task" - ); - return Some(agent_task); - } - - debug!( - agent_runtime_id = %agent_task.agent_runtime_id, - task_id = %agent_task.task_id, - "discarding cached agent task because auth binding changed" - ); - let mut state = self.state.lock().await; - if state.agent_task().as_ref() == Some(&agent_task) { - state.clear_agent_task(); - } - None - } - - async fn ensure_agent_task_registered(&self) -> anyhow::Result> { - if let Some(agent_task) = self.cached_agent_task_for_current_binding().await { - return Ok(Some(agent_task)); - } - - for _ in 0..2 { - let Some(agent_task) = self.services.agent_identity_manager.register_task().await? - else { - return Ok(None); - }; - - if !self - .services - .agent_identity_manager - .task_matches_current_binding(&agent_task) - .await - { - debug!( - agent_runtime_id = %agent_task.agent_runtime_id, - task_id = %agent_task.task_id, - "discarding newly registered agent task because auth binding changed" - ); - continue; - } - - { - let mut state = self.state.lock().await; - if let Some(existing_agent_task) = state.agent_task() { - if existing_agent_task.has_same_binding(&agent_task) { - return Ok(Some(existing_agent_task)); - } - debug!( - agent_runtime_id = %existing_agent_task.agent_runtime_id, - task_id = %existing_agent_task.task_id, - "replacing cached agent task because auth binding changed" - ); - } - state.set_agent_task(agent_task.clone()); - } - - info!( - thread_id = %self.conversation_id, - agent_runtime_id = %agent_task.agent_runtime_id, - task_id = %agent_task.task_id, - "registered agent task for thread" - ); - return Ok(Some(agent_task)); - } - - Ok(None) - } - #[allow(clippy::too_many_arguments)] fn make_turn_context( conversation_id: ThreadId, @@ -2256,6 +2176,7 @@ impl Session { services, js_repl, next_internal_sub_id: AtomicU64::new(0), + agent_task_registration_lock: Mutex::new(()), }); if let Some(network_policy_decider_session) = network_policy_decider_session { let mut guard = network_policy_decider_session.write().await; @@ -2531,6 +2452,7 @@ impl Session { } InitialHistory::Resumed(resumed_history) => { let rollout_items = resumed_history.history; + self.restore_persisted_agent_task(&rollout_items).await; let previous_turn_settings = self .apply_rollout_reconstruction(&turn_context, &rollout_items) .await; diff --git a/codex-rs/core/src/codex/agent_task_lifecycle.rs b/codex-rs/core/src/codex/agent_task_lifecycle.rs new file mode 100644 index 00000000000..b45c0f5b04a --- /dev/null +++ b/codex-rs/core/src/codex/agent_task_lifecycle.rs @@ -0,0 +1,157 @@ +use crate::agent_identity::RegisteredAgentTask; +use crate::codex::Session; +use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::SessionAgentTask; +use codex_protocol::protocol::SessionStateUpdate; +use tracing::debug; +use tracing::info; +use tracing::warn; + +impl Session { + pub(super) async fn maybe_prewarm_agent_task_registration(&self) { + // Startup task registration is best-effort: regular turns already retry on demand, and + // a prewarm failure should not shut down the session or block unrelated work. + if let Err(error) = self.ensure_agent_task_registered().await { + warn!( + error = %error, + "startup agent task prewarm failed; regular turns will retry registration" + ); + } + } + + fn latest_persisted_agent_task( + rollout_items: &[RolloutItem], + ) -> Option> { + rollout_items.iter().rev().find_map(|item| match item { + RolloutItem::SessionState(update) => Some(update.agent_task.clone()), + _ => None, + }) + } + + pub(super) async fn restore_persisted_agent_task(&self, rollout_items: &[RolloutItem]) { + let Some(agent_task) = Self::latest_persisted_agent_task(rollout_items).flatten() else { + return; + }; + + let mut state = self.state.lock().await; + state.set_agent_task(agent_task); + } + + async fn persist_agent_task_update(&self, agent_task: Option<&RegisteredAgentTask>) { + self.persist_rollout_items(&[RolloutItem::SessionState(SessionStateUpdate { + agent_task: agent_task.map(RegisteredAgentTask::to_session_agent_task), + })]) + .await; + } + + async fn clear_cached_agent_task(&self, agent_task: &RegisteredAgentTask) { + let cleared = { + let mut state = self.state.lock().await; + if state.agent_task().as_ref() == Some(&agent_task.to_session_agent_task()) { + state.clear_agent_task(); + true + } else { + false + } + }; + if cleared { + self.persist_agent_task_update(/*agent_task*/ None).await; + } + } + + async fn cache_agent_task(&self, agent_task: RegisteredAgentTask) -> RegisteredAgentTask { + let session_agent_task = agent_task.to_session_agent_task(); + let changed = { + let mut state = self.state.lock().await; + if state.agent_task().as_ref() == Some(&session_agent_task) { + false + } else { + state.set_agent_task(session_agent_task); + true + } + }; + if changed { + self.persist_agent_task_update(Some(&agent_task)).await; + } + agent_task + } + + pub(super) async fn cached_agent_task_for_current_identity( + &self, + ) -> Option { + let agent_task = { + let state = self.state.lock().await; + state + .agent_task() + .map(RegisteredAgentTask::from_session_agent_task) + }?; + + if self + .services + .agent_identity_manager + .task_matches_current_identity(&agent_task) + .await + { + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "reusing cached agent task" + ); + return Some(agent_task); + } + + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "discarding cached agent task because the registered agent identity changed" + ); + self.clear_cached_agent_task(&agent_task).await; + None + } + + pub(super) async fn ensure_agent_task_registered( + &self, + ) -> anyhow::Result> { + if let Some(agent_task) = self.cached_agent_task_for_current_identity().await { + return Ok(Some(agent_task)); + } + + let _guard = self.agent_task_registration_lock.lock().await; + if let Some(agent_task) = self.cached_agent_task_for_current_identity().await { + return Ok(Some(agent_task)); + } + + for _ in 0..2 { + let Some(agent_task) = self.services.agent_identity_manager.register_task().await? + else { + return Ok(None); + }; + + if !self + .services + .agent_identity_manager + .task_matches_current_identity(&agent_task) + .await + { + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "discarding newly registered agent task because the registered agent identity changed" + ); + continue; + } + + let agent_task = self.cache_agent_task(agent_task).await; + + info!( + thread_id = %self.conversation_id, + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "registered agent task for thread" + ); + return Ok(Some(agent_task)); + } + + Ok(None) + } +} diff --git a/codex-rs/core/src/codex/rollout_reconstruction.rs b/codex-rs/core/src/codex/rollout_reconstruction.rs index a4c042af0c8..3e407c4cd79 100644 --- a/codex-rs/core/src/codex/rollout_reconstruction.rs +++ b/codex-rs/core/src/codex/rollout_reconstruction.rs @@ -207,7 +207,9 @@ impl Session { active_segment.get_or_insert_with(ActiveReplaySegment::default); active_segment.counts_as_user_turn |= is_user_turn_boundary(response_item); } - RolloutItem::EventMsg(_) | RolloutItem::SessionMeta(_) => {} + RolloutItem::EventMsg(_) + | RolloutItem::SessionMeta(_) + | RolloutItem::SessionState(_) => {} } if base_replacement_history.is_some() @@ -275,6 +277,7 @@ impl Session { history.drop_last_n_user_turns(rollback.num_turns); } RolloutItem::EventMsg(_) + | RolloutItem::SessionState(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {} } diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 6c018bade37..af419bca5a8 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -1,4 +1,6 @@ use super::*; +use crate::agent_identity::RegisteredAgentTask; +use crate::agent_identity::StoredAgentIdentity; use crate::config::ConfigBuilder; use crate::config::test_config; use crate::config_loader::ConfigLayerStack; @@ -15,8 +17,19 @@ use crate::mcp_tool_exposure::build_mcp_tool_exposure; use crate::shell::default_user_shell; use crate::tools::format_exec_output_str; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; +use base64::engine::general_purpose::URL_SAFE_NO_PAD; +use chrono::SecondsFormat; +use chrono::Utc; use codex_features::Features; +use codex_login::AgentIdentityAuthRecord; +use codex_login::AuthCredentialsStoreMode; +use codex_login::AuthDotJson; use codex_login::CodexAuth; +use codex_login::save_auth; +use codex_login::token_data::IdTokenInfo; +use codex_login::token_data::TokenData; use codex_mcp::CODEX_APPS_MCP_SERVER_NAME; use codex_mcp::ToolInfo; use codex_model_provider_info::ModelProviderInfo; @@ -105,13 +118,24 @@ use core_test_support::responses::start_mock_server; use core_test_support::test_codex::test_codex; use core_test_support::tracing::install_test_tracing; use core_test_support::wait_for_event; +use crypto_box::SecretKey as Curve25519SecretKey; +use ed25519_dalek::SigningKey; +use ed25519_dalek::pkcs8::EncodePrivateKey; use opentelemetry::trace::TraceContextExt; use opentelemetry::trace::TraceId; +use sha2::Digest as _; +use sha2::Sha512; use std::path::Path; use std::time::Duration; use tokio::time::sleep; use tokio::time::timeout; use tracing_opentelemetry::OpenTelemetrySpanExt; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::header; +use wiremock::matchers::method; +use wiremock::matchers::path; use codex_protocol::mcp::CallToolResult as McpCallToolResult; use pretty_assertions::assert_eq; @@ -1018,6 +1042,70 @@ async fn record_initial_history_reconstructs_resumed_transcript() { assert_eq!(expected, history.raw_items()); } +#[tokio::test] +async fn record_initial_history_restores_latest_persisted_agent_task() { + let (session, _turn_context) = make_session_and_context().await; + let expected = RegisteredAgentTask { + agent_runtime_id: "agent-123".to_string(), + task_id: "task-123".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + }; + let rollout_items = vec![ + RolloutItem::SessionState(codex_protocol::protocol::SessionStateUpdate { + agent_task: Some(expected.to_session_agent_task()), + }), + RolloutItem::SessionState(codex_protocol::protocol::SessionStateUpdate { + agent_task: None, + }), + RolloutItem::SessionState(codex_protocol::protocol::SessionStateUpdate { + agent_task: Some(expected.to_session_agent_task()), + }), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!( + session.state.lock().await.agent_task(), + Some(expected.to_session_agent_task()) + ); +} + +#[tokio::test] +async fn record_initial_history_honors_cleared_persisted_agent_task() { + let (session, _turn_context) = make_session_and_context().await; + let rollout_items = vec![ + RolloutItem::SessionState(codex_protocol::protocol::SessionStateUpdate { + agent_task: Some( + RegisteredAgentTask { + agent_runtime_id: "agent-123".to_string(), + task_id: "task-123".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + } + .to_session_agent_task(), + ), + }), + RolloutItem::SessionState(codex_protocol::protocol::SessionStateUpdate { + agent_task: None, + }), + ]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!(session.state.lock().await.agent_task(), None); +} + #[tokio::test] async fn record_initial_history_new_defers_initial_context_until_first_turn() { let (session, _turn_context) = make_session_and_context().await; @@ -2909,6 +2997,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { services, js_repl, next_internal_sub_id: AtomicU64::new(0), + agent_task_registration_lock: Mutex::new(()), }; (session, turn_context) @@ -3555,19 +3644,25 @@ async fn shutdown_and_wait_shuts_down_tracked_ephemeral_guardian_review() { .expect("ephemeral guardian review should receive a shutdown op"); } -pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( +async fn make_session_and_context_with_auth_and_config_and_rx( + auth: CodexAuth, dynamic_tools: Vec, + configure_config: F, ) -> ( Arc, Arc, async_channel::Receiver, -) { +) +where + F: FnOnce(&mut Config), +{ let (tx_event, rx_event) = async_channel::unbounded(); let codex_home = tempfile::tempdir().expect("create temp dir"); - let config = build_test_config(codex_home.path()).await; + let mut config = build_test_config(codex_home.path()).await; + configure_config(&mut config); let config = Arc::new(config); let conversation_id = ThreadId::default(); - let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")); + let auth_manager = AuthManager::from_auth_for_testing(auth); let models_manager = Arc::new(ModelsManager::new( config.codex_home.to_path_buf(), auth_manager.clone(), @@ -3766,11 +3861,45 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( services, js_repl, next_internal_sub_id: AtomicU64::new(0), + agent_task_registration_lock: Mutex::new(()), }); (session, turn_context, rx_event) } +pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( + dynamic_tools: Vec, +) -> ( + Arc, + Arc, + async_channel::Receiver, +) { + make_session_and_context_with_auth_and_config_and_rx( + CodexAuth::from_api_key("Test API Key"), + dynamic_tools, + |_config| {}, + ) + .await +} + +async fn make_agent_identity_session_and_context_with_rx( + auth: CodexAuth, + chatgpt_base_url: String, +) -> ( + Arc, + Arc, + async_channel::Receiver, +) { + make_session_and_context_with_auth_and_config_and_rx(auth, Vec::new(), move |config| { + config.chatgpt_base_url = chatgpt_base_url; + config + .features + .enable(Feature::UseAgentIdentity) + .expect("test config should allow use_agent_identity"); + }) + .await +} + // Like make_session_and_context, but returns Arc and the event receiver // so tests can assert on emitted events. pub(crate) async fn make_session_and_context_with_rx() -> ( @@ -3810,6 +3939,219 @@ async fn fail_agent_identity_registration_emits_error_without_shutdown() { assert!(rx_event.try_recv().is_err()); } +#[tokio::test] +async fn startup_agent_task_prewarm_caches_registered_task() { + let server = MockServer::start().await; + let chatgpt_base_url = server.uri(); + let auth = make_chatgpt_auth("account-123", Some("user-123")); + let stored_identity = seed_stored_identity(&auth, "agent-123", "account-123"); + let encrypted_task_id = + encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext"); + mount_human_biscuit(&server, &chatgpt_base_url, "agent-123").await; + Mock::given(method("POST")) + .and(path("/v1/agent/agent-123/task/register")) + .and(header("x-openai-authorization", "human-biscuit")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "encrypted_task_id": encrypted_task_id, + }))) + .expect(1) + .mount(&server) + .await; + + let (session, _turn_context, rx_event) = + make_agent_identity_session_and_context_with_rx(auth, chatgpt_base_url).await; + + session.maybe_prewarm_agent_task_registration().await; + + let cached_task = session + .state + .lock() + .await + .agent_task() + .expect("task should be cached"); + assert_eq!(cached_task.agent_runtime_id, "agent-123"); + assert_eq!(cached_task.task_id, "task_123"); + assert!(rx_event.try_recv().is_err()); +} + +#[tokio::test] +async fn startup_agent_task_prewarm_failure_does_not_emit_error() { + let server = MockServer::start().await; + let chatgpt_base_url = server.uri(); + let auth = make_chatgpt_auth("account-123", Some("user-123")); + seed_stored_identity(&auth, "agent-123", "account-123"); + mount_human_biscuit(&server, &chatgpt_base_url, "agent-123").await; + Mock::given(method("POST")) + .and(path("/v1/agent/agent-123/task/register")) + .and(header("x-openai-authorization", "human-biscuit")) + .respond_with(ResponseTemplate::new(500)) + .expect(1) + .mount(&server) + .await; + + let (session, _turn_context, rx_event) = + make_agent_identity_session_and_context_with_rx(auth, chatgpt_base_url).await; + + session.maybe_prewarm_agent_task_registration().await; + + assert_eq!(session.state.lock().await.agent_task(), None); + assert!(rx_event.try_recv().is_err()); +} + +#[tokio::test] +async fn cached_agent_task_for_current_identity_clears_stale_task() { + let auth = make_chatgpt_auth("account-123", Some("user-123")); + seed_stored_identity(&auth, "agent-123", "account-123"); + let (session, _turn_context, _rx_event) = make_agent_identity_session_and_context_with_rx( + auth, + "https://chatgpt.com/backend-api".to_string(), + ) + .await; + { + let mut state = session.state.lock().await; + state.set_agent_task( + RegisteredAgentTask { + agent_runtime_id: "agent-old".to_string(), + task_id: "task-old".to_string(), + registered_at: "2026-04-15T00:00:00Z".to_string(), + } + .to_session_agent_task(), + ); + } + + assert_eq!(session.cached_agent_task_for_current_identity().await, None); + assert_eq!(session.state.lock().await.agent_task(), None); +} + +fn seed_stored_identity( + auth: &CodexAuth, + agent_runtime_id: &str, + account_id: &str, +) -> StoredAgentIdentity { + let signing_key = generate_test_signing_key(); + let private_key_pkcs8 = signing_key + .to_pkcs8_der() + .expect("encode test signing key as PKCS#8"); + let stored_identity = StoredAgentIdentity { + binding_id: format!("chatgpt-account-{account_id}"), + chatgpt_account_id: account_id.to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: agent_runtime_id.to_string(), + private_key_pkcs8_base64: BASE64_STANDARD.encode(private_key_pkcs8.as_bytes()), + public_key_ssh: "ssh-ed25519 test".to_string(), + registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + abom: crate::agent_identity::AgentBillOfMaterials { + agent_version: env!("CARGO_PKG_VERSION").to_string(), + agent_harness_id: "codex-cli".to_string(), + running_location: format!("{}-{}", SessionSource::Exec, std::env::consts::OS), + }, + }; + + auth.set_agent_identity(AgentIdentityAuthRecord { + workspace_id: account_id.to_string(), + chatgpt_user_id: stored_identity.chatgpt_user_id.clone(), + agent_runtime_id: stored_identity.agent_runtime_id.clone(), + agent_private_key: stored_identity.private_key_pkcs8_base64.clone(), + registered_at: stored_identity.registered_at.clone(), + }) + .expect("store identity"); + + stored_identity +} + +fn encrypt_task_id_for_identity( + stored_identity: &StoredAgentIdentity, + task_id: &str, +) -> anyhow::Result { + let signing_key = stored_identity.signing_key()?; + let mut rng = crypto_box::aead::OsRng; + let public_key = curve25519_secret_key_from_signing_key_for_tests(&signing_key).public_key(); + let ciphertext = public_key + .seal(&mut rng, task_id.as_bytes()) + .map_err(|_| anyhow::anyhow!("failed to encrypt test task id"))?; + Ok(BASE64_STANDARD.encode(ciphertext)) +} + +fn curve25519_secret_key_from_signing_key_for_tests( + signing_key: &SigningKey, +) -> Curve25519SecretKey { + let digest = Sha512::digest(signing_key.to_bytes()); + let mut secret_key = [0u8; 32]; + secret_key.copy_from_slice(&digest[..32]); + secret_key[0] &= 248; + secret_key[31] &= 127; + secret_key[31] |= 64; + Curve25519SecretKey::from(secret_key) +} + +fn generate_test_signing_key() -> SigningKey { + SigningKey::from_bytes(&[7u8; 32]) +} + +async fn mount_human_biscuit(server: &MockServer, chatgpt_base_url: &str, agent_runtime_id: &str) { + let biscuit_url = format!( + "{}/authenticate_app_v2", + chatgpt_base_url.trim_end_matches('/') + ); + let biscuit_path = reqwest::Url::parse(&biscuit_url) + .expect("biscuit URL parses") + .path() + .to_string(); + let target_url = format!( + "{}/v1/agent/{agent_runtime_id}/task/register", + chatgpt_base_url.trim_end_matches('/') + ); + Mock::given(method("GET")) + .and(path(biscuit_path)) + .and(header("authorization", "Bearer access-token-account-123")) + .and(header("x-original-method", "POST")) + .and(header("x-original-url", target_url)) + .respond_with( + ResponseTemplate::new(200).insert_header("x-openai-authorization", "human-biscuit"), + ) + .expect(1) + .mount(server) + .await; +} + +fn make_chatgpt_auth(account_id: &str, user_id: Option<&str>) -> CodexAuth { + let tempdir = tempfile::tempdir().expect("tempdir"); + let auth_json = AuthDotJson { + auth_mode: Some(codex_app_server_protocol::AuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(TokenData { + id_token: IdTokenInfo { + email: None, + chatgpt_plan_type: None, + chatgpt_user_id: user_id.map(ToOwned::to_owned), + chatgpt_account_id: Some(account_id.to_string()), + raw_jwt: fake_id_token(account_id, user_id), + }, + access_token: format!("access-token-{account_id}"), + refresh_token: "refresh-token".to_string(), + account_id: Some(account_id.to_string()), + }), + last_refresh: Some(Utc::now()), + agent_identity: None, + }; + save_auth(tempdir.path(), &auth_json, AuthCredentialsStoreMode::File).expect("save auth"); + CodexAuth::from_auth_storage(tempdir.path(), AuthCredentialsStoreMode::File) + .expect("load auth") + .expect("auth") +} + +fn fake_id_token(account_id: &str, user_id: Option<&str>) -> String { + let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#); + let payload = serde_json::json!({ + "https://api.openai.com/auth": { + "chatgpt_user_id": user_id, + "chatgpt_account_id": account_id, + } + }); + let payload = URL_SAFE_NO_PAD.encode(payload.to_string()); + format!("{header}.{payload}.signature") +} + #[tokio::test] async fn refresh_mcp_servers_is_deferred_until_next_turn() { let (session, turn_context) = make_session_and_context().await; diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 42ed15e87ae..ac76d4cadc7 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -6,12 +6,12 @@ use codex_sandboxing::policy_transforms::merge_permission_profiles; use std::collections::HashMap; use std::collections::HashSet; -use crate::agent_identity::RegisteredAgentTask; use crate::codex::PreviousTurnSettings; use crate::codex::SessionConfiguration; use crate::context_manager::ContextManager; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use codex_protocol::protocol::RateLimitSnapshot; +use codex_protocol::protocol::SessionAgentTask; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::TurnContextItem; @@ -31,7 +31,7 @@ pub(crate) struct SessionState { previous_turn_settings: Option, /// Startup prewarmed session prepared during session initialization. pub(crate) startup_prewarm: Option, - pub(crate) agent_task: Option, + pub(crate) agent_task: Option, pub(crate) active_connector_selection: HashSet, pub(crate) pending_session_start_source: Option, granted_permissions: Option, @@ -189,11 +189,11 @@ impl SessionState { self.startup_prewarm.take() } - pub(crate) fn agent_task(&self) -> Option { + pub(crate) fn agent_task(&self) -> Option { self.agent_task.clone() } - pub(crate) fn set_agent_task(&mut self, agent_task: RegisteredAgentTask) { + pub(crate) fn set_agent_task(&mut self, agent_task: SessionAgentTask) { self.agent_task = Some(agent_task); } diff --git a/codex-rs/core/src/state/session_tests.rs b/codex-rs/core/src/state/session_tests.rs index 6816c8731da..eecce2b8762 100644 --- a/codex-rs/core/src/state/session_tests.rs +++ b/codex-rs/core/src/state/session_tests.rs @@ -1,8 +1,8 @@ use super::*; -use crate::agent_identity::RegisteredAgentTask; use crate::codex::make_session_configuration_for_tests; use codex_protocol::protocol::CreditsSnapshot; use codex_protocol::protocol::RateLimitWindow; +use codex_protocol::protocol::SessionAgentTask; use pretty_assertions::assert_eq; #[tokio::test] @@ -38,11 +38,8 @@ async fn clear_connector_selection_removes_entries() { async fn set_agent_task_persists_plaintext_task_for_session_reuse() { let session_configuration = make_session_configuration_for_tests().await; let mut state = SessionState::new(session_configuration); - let agent_task = RegisteredAgentTask { - binding_id: "chatgpt-account-account-123".to_string(), - chatgpt_account_id: "account-123".to_string(), - chatgpt_user_id: Some("user-123".to_string()), - agent_runtime_id: "agent_123".to_string(), + let agent_task = SessionAgentTask { + agent_runtime_id: "agent-123".to_string(), task_id: "task_123".to_string(), registered_at: "2026-03-23T12:00:00Z".to_string(), }; @@ -56,10 +53,7 @@ async fn set_agent_task_persists_plaintext_task_for_session_reuse() { async fn clear_agent_task_removes_cached_task() { let session_configuration = make_session_configuration_for_tests().await; let mut state = SessionState::new(session_configuration); - let agent_task = RegisteredAgentTask { - binding_id: "chatgpt-account-account-123".to_string(), - chatgpt_account_id: "account-123".to_string(), - chatgpt_user_id: Some("user-123".to_string()), + let agent_task = SessionAgentTask { agent_runtime_id: "agent_123".to_string(), task_id: "task_123".to_string(), registered_at: "2026-03-23T12:00:00Z".to_string(), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 244f748c820..483caebb990 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2713,6 +2713,21 @@ impl fmt::Display for SubAgentSource { } } +/// Persisted agent-task details that let a resumed thread keep using the same backend task. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)] +pub struct SessionAgentTask { + pub agent_runtime_id: String, + pub task_id: String, + pub registered_at: String, +} + +/// Session-scoped state updates that can be appended after the canonical SessionMeta line. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS, Default)] +pub struct SessionStateUpdate { + #[serde(default)] + pub agent_task: Option, +} + /// SessionMeta contains session-level data that doesn't correspond to a specific turn. /// /// NOTE: There used to be an `instructions` field here, which stored user_instructions, but we @@ -2782,6 +2797,7 @@ pub struct SessionMetaLine { #[serde(tag = "type", content = "payload", rename_all = "snake_case")] pub enum RolloutItem { SessionMeta(SessionMetaLine), + SessionState(SessionStateUpdate), ResponseItem(ResponseItem), Compacted(CompactedItem), TurnContext(TurnContextItem), diff --git a/codex-rs/rollout/src/list.rs b/codex-rs/rollout/src/list.rs index afa41033077..554e1ce9514 100644 --- a/codex-rs/rollout/src/list.rs +++ b/codex-rs/rollout/src/list.rs @@ -1065,6 +1065,9 @@ async fn read_head_summary(path: &Path, head_limit: usize) -> io::Result { // Not included in `head`; skip. } + RolloutItem::SessionState(_) => { + // Not included in `head`; skip. + } RolloutItem::EventMsg(ev) => { if let EventMsg::UserMessage(user) = ev { summary.saw_user_event = true; @@ -1117,6 +1120,7 @@ pub async fn read_head_for_summary(path: &Path) -> io::Result {} } diff --git a/codex-rs/rollout/src/metadata.rs b/codex-rs/rollout/src/metadata.rs index 58d55a887df..65e347a3232 100644 --- a/codex-rs/rollout/src/metadata.rs +++ b/codex-rs/rollout/src/metadata.rs @@ -70,7 +70,8 @@ pub fn builder_from_items( ) -> Option { if let Some(session_meta) = items.iter().find_map(|item| match item { RolloutItem::SessionMeta(meta_line) => Some(meta_line), - RolloutItem::ResponseItem(_) + RolloutItem::SessionState(_) + | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, @@ -124,7 +125,8 @@ pub async fn extract_metadata_from_rollout( metadata, memory_mode: items.iter().rev().find_map(|item| match item { RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(), - RolloutItem::ResponseItem(_) + RolloutItem::SessionState(_) + | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 4b50781e76e..6e6fb8b7a26 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -16,9 +16,10 @@ pub fn is_persisted_response_item(item: &RolloutItem, mode: EventPersistenceMode RolloutItem::ResponseItem(item) => should_persist_response_item(item), RolloutItem::EventMsg(ev) => should_persist_event_msg(ev, mode), // Persist Codex executive markers so we can analyze flows (e.g., compaction, API turns). - RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => { - true - } + RolloutItem::Compacted(_) + | RolloutItem::TurnContext(_) + | RolloutItem::SessionMeta(_) + | RolloutItem::SessionState(_) => true, } } diff --git a/codex-rs/rollout/src/recorder.rs b/codex-rs/rollout/src/recorder.rs index 4f88ff1a216..0daffc0572f 100644 --- a/codex-rs/rollout/src/recorder.rs +++ b/codex-rs/rollout/src/recorder.rs @@ -683,6 +683,9 @@ impl RolloutRecorder { RolloutItem::Compacted(item) => { items.push(RolloutItem::Compacted(item)); } + RolloutItem::SessionState(update) => { + items.push(RolloutItem::SessionState(update)); + } RolloutItem::TurnContext(item) => { items.push(RolloutItem::TurnContext(item)); } @@ -1303,6 +1306,7 @@ async fn resume_candidate_matches_cwd( && let Some(latest_turn_context_cwd) = items.iter().rev().find_map(|item| match item { RolloutItem::TurnContext(turn_context) => Some(turn_context.cwd.as_path()), RolloutItem::SessionMeta(_) + | RolloutItem::SessionState(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) | RolloutItem::EventMsg(_) => None, diff --git a/codex-rs/state/src/extract.rs b/codex-rs/state/src/extract.rs index 91c6755ffe9..92946373615 100644 --- a/codex-rs/state/src/extract.rs +++ b/codex-rs/state/src/extract.rs @@ -19,6 +19,7 @@ pub fn apply_rollout_item( ) { match item { RolloutItem::SessionMeta(meta_line) => apply_session_meta_from_item(metadata, meta_line), + RolloutItem::SessionState(_) => {} RolloutItem::TurnContext(turn_ctx) => apply_turn_context(metadata, turn_ctx), RolloutItem::EventMsg(event) => apply_event_msg(metadata, event), RolloutItem::ResponseItem(item) => apply_response_item(metadata, item), @@ -36,9 +37,10 @@ pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool { RolloutItem::EventMsg( EventMsg::TokenCount(_) | EventMsg::UserMessage(_) | EventMsg::ThreadNameUpdated(_), ) => true, - RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => { - false - } + RolloutItem::SessionState(_) + | RolloutItem::EventMsg(_) + | RolloutItem::ResponseItem(_) + | RolloutItem::Compacted(_) => false, } } diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index efd5b9c1921..c8ff450f67b 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -954,7 +954,8 @@ SELECT pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option>> { items.iter().find_map(|item| match item { RolloutItem::SessionMeta(meta_line) => Some(meta_line.meta.dynamic_tools.clone()), - RolloutItem::ResponseItem(_) + RolloutItem::SessionState(_) + | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, @@ -964,7 +965,8 @@ pub(super) fn extract_dynamic_tools(items: &[RolloutItem]) -> Option Option { items.iter().rev().find_map(|item| match item { RolloutItem::SessionMeta(meta_line) => meta_line.meta.memory_mode.clone(), - RolloutItem::ResponseItem(_) + RolloutItem::SessionState(_) + | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) | RolloutItem::TurnContext(_) | RolloutItem::EventMsg(_) => None, From 1f624eda170a3c0b0bd0b247f894768411949d36 Mon Sep 17 00:00:00 2001 From: adrian Date: Wed, 15 Apr 2026 18:11:00 -0700 Subject: [PATCH 2/4] Stabilize Windows unsubscribe test --- .../app-server/tests/suite/v2/thread_unsubscribe.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs b/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs index 6aab3d186f9..d26973504c5 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs @@ -130,9 +130,13 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<( async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> { #[cfg(target_os = "windows")] let shell_command = vec![ - "powershell".to_string(), - "-Command".to_string(), - "Start-Sleep -Seconds 1".to_string(), + "cmd.exe".to_string(), + "/d".to_string(), + "/c".to_string(), + "ping".to_string(), + "-n".to_string(), + "2".to_string(), + "127.0.0.1".to_string(), ]; #[cfg(not(target_os = "windows"))] let shell_command = vec!["sleep".to_string(), "1".to_string()]; From d71c5e809d74dab50bf7416bac7e9cc31c7a38a8 Mon Sep 17 00:00:00 2001 From: adrian Date: Wed, 15 Apr 2026 22:06:16 -0700 Subject: [PATCH 3/4] Fix agent task prewarm resume ordering --- codex-rs/app-server/tests/suite/v2/client_metadata.rs | 3 ++- .../tests/suite/v2/collaboration_mode_list.rs | 3 ++- .../tests/suite/v2/connection_handling_websocket.rs | 11 ++++++----- codex-rs/app-server/tests/suite/v2/initialize.rs | 3 ++- .../tests/suite/v2/realtime_conversation.rs | 4 ++-- codex-rs/core/src/codex.rs | 2 +- codex-rs/core/src/codex/agent_task_lifecycle.rs | 7 +++++-- codex-rs/core/src/codex_tests.rs | 11 +++++++++++ 8 files changed, 31 insertions(+), 13 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/client_metadata.rs b/codex-rs/app-server/tests/suite/v2/client_metadata.rs index e343e08470b..e85bf0dcfc2 100644 --- a/codex-rs/app-server/tests/suite/v2/client_metadata.rs +++ b/codex-rs/app-server/tests/suite/v2/client_metadata.rs @@ -18,7 +18,8 @@ use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20); +// These tests start full app-server processes; keep headroom for concurrent debug startup. +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); #[tokio::test] async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result<()> { diff --git a/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs b/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs index 0dd4e8174a8..4bbc163e422 100644 --- a/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs +++ b/codex-rs/app-server/tests/suite/v2/collaboration_mode_list.rs @@ -21,7 +21,8 @@ use pretty_assertions::assert_eq; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(20); +// These tests start full app-server processes; keep headroom for concurrent debug startup. +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); /// Confirms the server returns the default collaboration mode presets in a stable order. #[tokio::test] diff --git a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs index 0b4c476ca7c..1d07a280a62 100644 --- a/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs +++ b/codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs @@ -47,7 +47,8 @@ use tokio_tungstenite::tungstenite::http::HeaderValue; use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION; use tokio_tungstenite::tungstenite::http::header::ORIGIN; -pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(20); +// These tests start full app-server processes; keep headroom for concurrent debug startup. +pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(30); pub(super) type WsClient = WebSocketStream>; type HmacSha256 = Hmac; @@ -399,7 +400,7 @@ pub(super) async fn spawn_websocket_server_with_args( .take() .context("failed to capture websocket app-server stderr")?; let mut stderr_reader = BufReader::new(stderr).lines(); - let deadline = Instant::now() + Duration::from_secs(10); + let deadline = Instant::now() + DEFAULT_READ_TIMEOUT; let bind_addr = loop { let line = timeout( deadline.saturating_duration_since(Instant::now()), @@ -457,7 +458,7 @@ pub(super) async fn connect_websocket_with_bearer( ) -> Result { let url = format!("ws://{}", connectable_bind_addr(bind_addr)); let request = websocket_request(url.as_str(), bearer_token, /*origin*/ None)?; - let deadline = Instant::now() + Duration::from_secs(10); + let deadline = Instant::now() + DEFAULT_READ_TIMEOUT; loop { match connect_async(request.clone()).await { Ok((stream, _response)) => return Ok(stream), @@ -524,7 +525,7 @@ async fn run_websocket_server_to_completion_with_args( .stderr(Stdio::piped()) .env("CODEX_HOME", codex_home) .env("RUST_LOG", "debug"); - timeout(Duration::from_secs(10), cmd.output()) + timeout(DEFAULT_READ_TIMEOUT, cmd.output()) .await .context("timed out waiting for websocket app-server to exit")? .context("failed to run websocket app-server") @@ -536,7 +537,7 @@ async fn http_get( path: &str, ) -> Result { let connectable_bind_addr = connectable_bind_addr(bind_addr); - let deadline = Instant::now() + Duration::from_secs(10); + let deadline = Instant::now() + DEFAULT_READ_TIMEOUT; loop { match client .get(format!("http://{connectable_bind_addr}{path}")) diff --git a/codex-rs/app-server/tests/suite/v2/initialize.rs b/codex-rs/app-server/tests/suite/v2/initialize.rs index cfb08a17187..81faab16533 100644 --- a/codex-rs/app-server/tests/suite/v2/initialize.rs +++ b/codex-rs/app-server/tests/suite/v2/initialize.rs @@ -24,7 +24,8 @@ use std::time::Duration; use tempfile::TempDir; use tokio::time::timeout; -const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20); +// These tests start full app-server processes; keep headroom for concurrent debug startup. +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); #[tokio::test] async fn initialize_uses_client_info_name_as_originator() -> Result<()> { diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index aab588ad411..9870d410f17 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -71,7 +71,7 @@ use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::path_regex; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(20); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; const V2_STEERING_ACKNOWLEDGEMENT: &str = "This was sent to steer the previous background agent task."; @@ -1710,7 +1710,7 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<( create_shell_command_sse_response( realtime_tool_ok_command(), /*workdir*/ None, - Some(10_000), + Some(5000), "shell_call", )?, create_final_assistant_message_sse_response("shell tool finished")?, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 613f710ccc7..fc6ca89c845 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2213,7 +2213,6 @@ impl Session { // Start the watcher after SessionConfigured so it cannot emit earlier events. sess.start_skills_watcher_listener(); - sess.start_agent_identity_registration(); // Construct sandbox_state before MCP startup so it can be sent to each // MCP server immediately after it becomes ready (avoiding blocking). let sandbox_state = SandboxState { @@ -2302,6 +2301,7 @@ impl Session { // record_initial_history can emit events. We record only after the SessionConfiguredEvent is emitted. sess.record_initial_history(initial_history).await; + sess.start_agent_identity_registration(); { let mut state = sess.state.lock().await; state.set_pending_session_start_source(Some(session_start_source)); diff --git a/codex-rs/core/src/codex/agent_task_lifecycle.rs b/codex-rs/core/src/codex/agent_task_lifecycle.rs index b45c0f5b04a..5d33999d933 100644 --- a/codex-rs/core/src/codex/agent_task_lifecycle.rs +++ b/codex-rs/core/src/codex/agent_task_lifecycle.rs @@ -29,12 +29,15 @@ impl Session { } pub(super) async fn restore_persisted_agent_task(&self, rollout_items: &[RolloutItem]) { - let Some(agent_task) = Self::latest_persisted_agent_task(rollout_items).flatten() else { + let Some(agent_task_update) = Self::latest_persisted_agent_task(rollout_items) else { return; }; let mut state = self.state.lock().await; - state.set_agent_task(agent_task); + match agent_task_update { + Some(agent_task) => state.set_agent_task(agent_task), + None => state.clear_agent_task(), + } } async fn persist_agent_task_update(&self, agent_task: Option<&RegisteredAgentTask>) { diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index af419bca5a8..97383433318 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -1079,6 +1079,17 @@ async fn record_initial_history_restores_latest_persisted_agent_task() { #[tokio::test] async fn record_initial_history_honors_cleared_persisted_agent_task() { let (session, _turn_context) = make_session_and_context().await; + { + let mut state = session.state.lock().await; + state.set_agent_task( + RegisteredAgentTask { + agent_runtime_id: "agent-fresh".to_string(), + task_id: "task-fresh".to_string(), + registered_at: "2026-03-23T12:01:00Z".to_string(), + } + .to_session_agent_task(), + ); + } let rollout_items = vec![ RolloutItem::SessionState(codex_protocol::protocol::SessionStateUpdate { agent_task: Some( From 615183b2f2b4cd76a1945b164d7903a97a6396ef Mon Sep 17 00:00:00 2001 From: adrian Date: Thu, 16 Apr 2026 09:43:17 -0700 Subject: [PATCH 4/4] Validate restored agent task identity --- .../core/src/codex/agent_task_lifecycle.rs | 28 +++++++++++-- codex-rs/core/src/codex_tests.rs | 41 ++++++++++++++++++- codex-rs/protocol/src/protocol.rs | 1 + 3 files changed, 66 insertions(+), 4 deletions(-) diff --git a/codex-rs/core/src/codex/agent_task_lifecycle.rs b/codex-rs/core/src/codex/agent_task_lifecycle.rs index 5d33999d933..fbcd90e717a 100644 --- a/codex-rs/core/src/codex/agent_task_lifecycle.rs +++ b/codex-rs/core/src/codex/agent_task_lifecycle.rs @@ -33,10 +33,32 @@ impl Session { return; }; - let mut state = self.state.lock().await; match agent_task_update { - Some(agent_task) => state.set_agent_task(agent_task), - None => state.clear_agent_task(), + Some(agent_task) => { + let registered_task = + RegisteredAgentTask::from_session_agent_task(agent_task.clone()); + if self + .services + .agent_identity_manager + .task_matches_current_identity(®istered_task) + .await + { + let mut state = self.state.lock().await; + state.set_agent_task(agent_task); + } else { + debug!( + agent_runtime_id = %registered_task.agent_runtime_id, + task_id = %registered_task.task_id, + "discarding persisted agent task because it does not match the registered agent identity" + ); + let mut state = self.state.lock().await; + state.clear_agent_task(); + } + } + None => { + let mut state = self.state.lock().await; + state.clear_agent_task(); + } } } diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 97383433318..831cb3eb6be 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -1044,7 +1044,13 @@ async fn record_initial_history_reconstructs_resumed_transcript() { #[tokio::test] async fn record_initial_history_restores_latest_persisted_agent_task() { - let (session, _turn_context) = make_session_and_context().await; + let auth = make_chatgpt_auth("account-123", Some("user-123")); + seed_stored_identity(&auth, "agent-123", "account-123"); + let (session, _turn_context, _rx_event) = make_agent_identity_session_and_context_with_rx( + auth, + "https://chatgpt.com/backend-api".to_string(), + ) + .await; let expected = RegisteredAgentTask { agent_runtime_id: "agent-123".to_string(), task_id: "task-123".to_string(), @@ -1076,6 +1082,39 @@ async fn record_initial_history_restores_latest_persisted_agent_task() { ); } +#[tokio::test] +async fn record_initial_history_discards_persisted_agent_task_for_different_identity() { + let auth = make_chatgpt_auth("account-123", Some("user-123")); + seed_stored_identity(&auth, "agent-123", "account-123"); + let (session, _turn_context, _rx_event) = make_agent_identity_session_and_context_with_rx( + auth, + "https://chatgpt.com/backend-api".to_string(), + ) + .await; + let rollout_items = vec![RolloutItem::SessionState( + codex_protocol::protocol::SessionStateUpdate { + agent_task: Some( + RegisteredAgentTask { + agent_runtime_id: "agent-other".to_string(), + task_id: "task-other".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + } + .to_session_agent_task(), + ), + }, + )]; + + session + .record_initial_history(InitialHistory::Resumed(ResumedHistory { + conversation_id: ThreadId::default(), + history: rollout_items, + rollout_path: PathBuf::from("/tmp/resume.jsonl"), + })) + .await; + + assert_eq!(session.state.lock().await.agent_task(), None); +} + #[tokio::test] async fn record_initial_history_honors_cleared_persisted_agent_task() { let (session, _turn_context) = make_session_and_context().await; diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 483caebb990..c0f8b28dcd9 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -2716,6 +2716,7 @@ impl fmt::Display for SubAgentSource { /// Persisted agent-task details that let a resumed thread keep using the same backend task. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)] pub struct SessionAgentTask { + /// Validation metadata only. Restored tasks must match the globally registered identity. pub agent_runtime_id: String, pub task_id: String, pub registered_at: String,