Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion codex-rs/app-server-protocol/src/protocol/thread_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ impl ThreadHistoryBuilder {
RolloutItem::EventMsg(event) => self.handle_event(event),
RolloutItem::Compacted(payload) => self.handle_compacted(payload),
RolloutItem::ResponseItem(item) => self.handle_response_item(item),
RolloutItem::TurnContext(_) | RolloutItem::SessionMeta(_) => {}
RolloutItem::TurnContext(_)
| RolloutItem::SessionMeta(_)
| RolloutItem::SessionState(_) => {}
}
}

Expand Down
3 changes: 2 additions & 1 deletion codex-rs/app-server/tests/suite/v2/client_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;

const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
// These tests start full app-server processes; keep headroom for concurrent debug startup.
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

#[tokio::test]
async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::timeout;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
// These tests start full app-server processes; keep headroom for concurrent debug startup.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);

/// Confirms the server returns the default collaboration mode presets in a stable order.
#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION;
use tokio_tungstenite::tungstenite::http::header::ORIGIN;

pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
// These tests start full app-server processes; keep headroom for concurrent debug startup.
pub(super) const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(30);

pub(super) type WsClient = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
type HmacSha256 = Hmac<Sha256>;
Expand Down Expand Up @@ -399,7 +400,7 @@ pub(super) async fn spawn_websocket_server_with_args(
.take()
.context("failed to capture websocket app-server stderr")?;
let mut stderr_reader = BufReader::new(stderr).lines();
let deadline = Instant::now() + Duration::from_secs(10);
let deadline = Instant::now() + DEFAULT_READ_TIMEOUT;
let bind_addr = loop {
let line = timeout(
deadline.saturating_duration_since(Instant::now()),
Expand Down Expand Up @@ -457,7 +458,7 @@ pub(super) async fn connect_websocket_with_bearer(
) -> Result<WsClient> {
let url = format!("ws://{}", connectable_bind_addr(bind_addr));
let request = websocket_request(url.as_str(), bearer_token, /*origin*/ None)?;
let deadline = Instant::now() + Duration::from_secs(10);
let deadline = Instant::now() + DEFAULT_READ_TIMEOUT;
loop {
match connect_async(request.clone()).await {
Ok((stream, _response)) => return Ok(stream),
Expand Down Expand Up @@ -524,7 +525,7 @@ async fn run_websocket_server_to_completion_with_args(
.stderr(Stdio::piped())
.env("CODEX_HOME", codex_home)
.env("RUST_LOG", "debug");
timeout(Duration::from_secs(10), cmd.output())
timeout(DEFAULT_READ_TIMEOUT, cmd.output())
.await
.context("timed out waiting for websocket app-server to exit")?
.context("failed to run websocket app-server")
Expand All @@ -536,7 +537,7 @@ async fn http_get(
path: &str,
) -> Result<reqwest::Response> {
let connectable_bind_addr = connectable_bind_addr(bind_addr);
let deadline = Instant::now() + Duration::from_secs(10);
let deadline = Instant::now() + DEFAULT_READ_TIMEOUT;
loop {
match client
.get(format!("http://{connectable_bind_addr}{path}"))
Expand Down
3 changes: 2 additions & 1 deletion codex-rs/app-server/tests/suite/v2/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;

const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
// These tests start full app-server processes; keep headroom for concurrent debug startup.
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);

#[tokio::test]
async fn initialize_uses_client_info_name_as_originator() -> Result<()> {
Expand Down
10 changes: 7 additions & 3 deletions codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,13 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 1".to_string(),
"cmd.exe".to_string(),
"/d".to_string(),
"/c".to_string(),
"ping".to_string(),
"-n".to_string(),
"2".to_string(),
"127.0.0.1".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "1".to_string()];
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn keep_forked_rollout_item(item: &RolloutItem) -> bool {
| ResponseItem::Compaction { .. }
| ResponseItem::Other,
) => false,
RolloutItem::SessionState(_) => false,
RolloutItem::Compacted(_)
| RolloutItem::EventMsg(_)
| RolloutItem::SessionMeta(_)
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/agent/control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> boo
sleep(Duration::from_millis(25)).await;
}
};
timeout(Duration::from_secs(2), wait).await.is_ok()
timeout(Duration::from_secs(5), wait).await.is_ok()
}

async fn persist_thread_for_tree_resume(thread: &Arc<CodexThread>, message: &str) {
Expand Down
13 changes: 10 additions & 3 deletions codex-rs/core/src/agent_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,16 @@ impl AgentIdentityManager {
Ok(stored_identity)
}

pub(crate) async fn task_matches_current_binding(&self, task: &RegisteredAgentTask) -> bool {
pub(crate) async fn task_matches_current_identity(&self, task: &RegisteredAgentTask) -> bool {
if !self.feature_enabled {
return false;
}

self.current_auth_binding()
self.current_stored_identity()
.await
.is_some_and(|(_, binding)| task.matches_binding(&binding))
.is_some_and(|stored_identity| {
stored_identity.agent_runtime_id == task.agent_runtime_id
})
}

async fn current_auth_binding(&self) -> Option<(CodexAuth, AgentIdentityBinding)> {
Expand All @@ -177,6 +179,11 @@ impl AgentIdentityManager {
binding.map(|binding| (auth, binding))
}

async fn current_stored_identity(&self) -> Option<StoredAgentIdentity> {
let (auth, binding) = self.current_auth_binding().await?;
self.load_stored_identity(&auth, &binding).ok().flatten()
}

async fn register_agent_identity(
&self,
binding: &AgentIdentityBinding,
Expand Down
42 changes: 15 additions & 27 deletions codex-rs/core/src/agent_identity/task_registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use anyhow::Context;
use anyhow::Result;
use codex_protocol::protocol::SessionAgentTask;
use crypto_box::SecretKey as Curve25519SecretKey;
use ed25519_dalek::Signer as _;
use serde::Deserialize;
Expand All @@ -16,9 +17,6 @@ const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15);

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct RegisteredAgentTask {
pub(crate) binding_id: String,
pub(crate) chatgpt_account_id: String,
pub(crate) chatgpt_user_id: Option<String>,
pub(crate) agent_runtime_id: String,
pub(crate) task_id: String,
pub(crate) registered_at: String,
Expand Down Expand Up @@ -82,9 +80,6 @@ impl AgentIdentityManager {
.await
.with_context(|| format!("failed to parse agent task response from {url}"))?;
let registered_task = RegisteredAgentTask {
binding_id: stored_identity.binding_id.clone(),
chatgpt_account_id: stored_identity.chatgpt_account_id.clone(),
chatgpt_user_id: stored_identity.chatgpt_user_id.clone(),
agent_runtime_id: stored_identity.agent_runtime_id.clone(),
task_id: decrypt_task_id_response(
&stored_identity,
Expand All @@ -107,18 +102,20 @@ impl AgentIdentityManager {
}

impl RegisteredAgentTask {
pub(super) fn matches_binding(&self, binding: &AgentIdentityBinding) -> bool {
binding.matches_parts(
&self.binding_id,
&self.chatgpt_account_id,
self.chatgpt_user_id.as_deref(),
)
pub(crate) fn to_session_agent_task(&self) -> SessionAgentTask {
SessionAgentTask {
agent_runtime_id: self.agent_runtime_id.clone(),
task_id: self.task_id.clone(),
registered_at: self.registered_at.clone(),
}
}

pub(crate) fn has_same_binding(&self, other: &Self) -> bool {
self.binding_id == other.binding_id
&& self.chatgpt_account_id == other.chatgpt_account_id
&& self.chatgpt_user_id == other.chatgpt_user_id
pub(crate) fn from_session_agent_task(task: SessionAgentTask) -> Self {
Self {
agent_runtime_id: task.agent_runtime_id,
task_id: task.task_id,
registered_at: task.registered_at,
}
}
}

Expand Down Expand Up @@ -242,9 +239,6 @@ mod tests {
assert_eq!(
task,
RegisteredAgentTask {
binding_id: "chatgpt-account-account-123".to_string(),
chatgpt_account_id: "account-123".to_string(),
chatgpt_user_id: Some("user-123".to_string()),
agent_runtime_id: "agent-123".to_string(),
task_id: "task_123".to_string(),
registered_at: task.registered_at.clone(),
Expand Down Expand Up @@ -331,9 +325,6 @@ mod tests {
assert_eq!(
task,
RegisteredAgentTask {
binding_id: "chatgpt-account-account-123".to_string(),
chatgpt_account_id: "account-123".to_string(),
chatgpt_user_id: Some("user-123".to_string()),
agent_runtime_id: "agent-123".to_string(),
task_id: "task_123".to_string(),
registered_at: task.registered_at.clone(),
Expand All @@ -342,7 +333,7 @@ mod tests {
}

#[tokio::test]
async fn task_matches_current_binding_rejects_stale_auth_binding() {
async fn task_matches_current_identity_rejects_stale_registered_identity() {
let auth_manager =
AuthManager::from_auth_for_testing(make_chatgpt_auth("account-456", Some("user-456")));
let manager = AgentIdentityManager::new_for_tests(
Expand All @@ -352,15 +343,12 @@ mod tests {
SessionSource::Cli,
);
let task = RegisteredAgentTask {
binding_id: "chatgpt-account-account-123".to_string(),
chatgpt_account_id: "account-123".to_string(),
chatgpt_user_id: Some("user-123".to_string()),
agent_runtime_id: "agent-123".to_string(),
task_id: "task_123".to_string(),
registered_at: "2026-03-23T12:00:00Z".to_string(),
};

assert!(!manager.task_matches_current_binding(&task).await);
assert!(!manager.task_matches_current_identity(&task).await);
}

async fn mount_human_biscuit(
Expand Down
Loading
Loading