From 42adc2c048056f578891e20165c84954ce8a034e Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 15 Apr 2026 22:25:11 +0100 Subject: [PATCH 1/2] fix: more flake --- .../suite/responses_api_proxy_headers.rs | 296 +++++------------- codex-rs/responses-api-proxy/src/dump.rs | 18 ++ 2 files changed, 92 insertions(+), 222 deletions(-) diff --git a/codex-rs/core/tests/suite/responses_api_proxy_headers.rs b/codex-rs/core/tests/suite/responses_api_proxy_headers.rs index 8199956ee03..2c4519ae749 100644 --- a/codex-rs/core/tests/suite/responses_api_proxy_headers.rs +++ b/codex-rs/core/tests/suite/responses_api_proxy_headers.rs @@ -1,6 +1,5 @@ -//! Exercises a real `responses-api-proxy` process with request dumping enabled, then verifies that -//! parent and spawned subagent requests carry the expected window, parent-thread, and subagent -//! identity headers in the dumped Responses API requests. +//! Verifies that parent and spawned subagent Responses API requests carry the expected window, +//! parent-thread, and subagent identity headers. use anyhow::Result; use anyhow::anyhow; @@ -10,6 +9,8 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::Op; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::user_input::UserInput; +use core_test_support::responses::ResponseMock; +use core_test_support::responses::ResponsesRequest; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_function_call; @@ -21,119 +22,28 @@ use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodex; use core_test_support::test_codex::test_codex; use pretty_assertions::assert_eq; -use serde_json::Value; use serde_json::json; -use std::io::ErrorKind; -use std::io::Write; -use std::path::Path; -use std::process::Child; -use std::process::Command as StdCommand; -use std::process::Stdio; use std::time::Duration; -use std::time::Instant; -use tempfile::TempDir; const PARENT_PROMPT: &str = "spawn a subagent and report when it is started"; const CHILD_PROMPT: &str = "child: say done"; const SPAWN_CALL_ID: &str = "spawn-call-1"; -const PROXY_START_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 30); -const PROXY_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20); +const REQUEST_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20); const TURN_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 60); -struct ResponsesApiProxy { - child: Child, - port: u16, -} - -impl ResponsesApiProxy { - fn start(upstream_url: &str, dump_dir: &Path) -> Result { - let server_info = dump_dir.join("server-info.json"); - let (proxy_program, use_codex_multitool) = - match codex_utils_cargo_bin::cargo_bin("codex-responses-api-proxy") { - Ok(path) => (path, false), - Err(_) => (codex_utils_cargo_bin::cargo_bin("codex")?, true), - }; - let mut command = StdCommand::new(proxy_program); - if use_codex_multitool { - command.arg("responses-api-proxy"); - } - let mut child = command - .args(["--server-info"]) - .arg(&server_info) - .args(["--upstream-url", upstream_url, "--dump-dir"]) - .arg(dump_dir) - .stdin(Stdio::piped()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn()?; - - child - .stdin - .take() - .ok_or_else(|| anyhow!("responses-api-proxy stdin was not piped"))? - .write_all(b"dummy\n")?; - - let deadline = Instant::now() + PROXY_START_TIMEOUT; - loop { - match std::fs::read_to_string(&server_info) { - Ok(info) => { - if !info.trim().is_empty() { - match serde_json::from_str::(&info) { - Ok(info) => { - let port = info - .get("port") - .and_then(Value::as_u64) - .ok_or_else(|| anyhow!("proxy server info missing port"))?; - return Ok(Self { - child, - port: u16::try_from(port)?, - }); - } - Err(err) if err.is_eof() => {} - Err(err) => return Err(err.into()), - } - } - } - Err(err) if err.kind() == ErrorKind::NotFound => {} - Err(err) => return Err(err.into()), - } - if let Some(status) = child.try_wait()? { - return Err(anyhow!( - "responses-api-proxy exited before writing server info: {status}" - )); - } - if Instant::now() >= deadline { - return Err(anyhow!("timed out waiting for responses-api-proxy")); - } - std::thread::sleep(PROXY_POLL_INTERVAL); - } - } - - fn base_url(&self) -> String { - format!("http://127.0.0.1:{}/v1", self.port) - } -} - -impl Drop for ResponsesApiProxy { - fn drop(&mut self) { - let _ = self.child.kill(); - let _ = self.child.wait(); - } -} - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Result<()> { +async fn responses_api_parent_and_subagent_requests_include_identity_headers() -> Result<()> { skip_if_no_network!(Ok(())); let server = start_mock_server().await; - let dump_dir = TempDir::new()?; - let proxy = - ResponsesApiProxy::start(&format!("{}/v1/responses", server.uri()), dump_dir.path())?; let spawn_args = serde_json::to_string(&json!({ "message": CHILD_PROMPT }))?; - mount_sse_once_match( + let parent_mock = mount_sse_once_match( &server, - |req: &wiremock::Request| request_body_contains(req, PARENT_PROMPT), + |req: &wiremock::Request| { + request_body_contains(req, PARENT_PROMPT) + && request_header(req, "x-openai-subagent").is_none() + }, sse(vec![ ev_response_created("resp-parent-1"), ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args), @@ -141,10 +51,12 @@ async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Res ]), ) .await; - mount_sse_once_match( + let child_mock = mount_sse_once_match( &server, |req: &wiremock::Request| { - request_body_contains(req, CHILD_PROMPT) && !request_body_contains(req, SPAWN_CALL_ID) + request_body_contains(req, CHILD_PROMPT) + && !request_body_contains(req, SPAWN_CALL_ID) + && request_header(req, "x-openai-subagent").as_deref() == Some("collab_spawn") }, sse(vec![ ev_response_created("resp-child-1"), @@ -155,7 +67,10 @@ async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Res .await; mount_sse_once_match( &server, - |req: &wiremock::Request| request_body_contains(req, SPAWN_CALL_ID), + |req: &wiremock::Request| { + request_body_contains(req, SPAWN_CALL_ID) + && request_header(req, "x-openai-subagent").is_none() + }, sse(vec![ ev_response_created("resp-parent-2"), ev_assistant_message("msg-parent-2", "parent done"), @@ -164,50 +79,52 @@ async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Res ) .await; - let proxy_base_url = proxy.base_url(); - let mut builder = test_codex().with_config(move |config| { - config.model_provider.base_url = Some(proxy_base_url); + let mut builder = test_codex().with_config(|config| { config .features .disable(Feature::EnableRequestCompression) .expect("test config should allow feature update"); }); let test = builder.build(&server).await?; - submit_turn_with_timeout(&test, PARENT_PROMPT, dump_dir.path()).await?; + submit_turn_with_timeout(&test, PARENT_PROMPT).await?; - let dumps = wait_for_proxy_request_dumps(dump_dir.path())?; - let parent = dumps - .iter() - .find(|dump| dump_body_contains(dump, PARENT_PROMPT)) - .ok_or_else(|| anyhow!("missing parent request dump"))?; - let child = dumps - .iter() - .find(|dump| { - dump_body_contains(dump, CHILD_PROMPT) && !dump_body_contains(dump, SPAWN_CALL_ID) - }) - .ok_or_else(|| anyhow!("missing child request dump"))?; + let parent = wait_for_matching_request(&parent_mock, "parent request", |request| { + request.body_contains_text(PARENT_PROMPT) && request.header("x-openai-subagent").is_none() + }) + .await?; + let child = wait_for_matching_request(&child_mock, "child request", |request| { + request.body_contains_text(CHILD_PROMPT) + && !request.body_contains_text(SPAWN_CALL_ID) + && request.header("x-openai-subagent").as_deref() == Some("collab_spawn") + }) + .await?; - let parent_window_id = header(parent, "x-codex-window-id") + let parent_window_id = parent + .header("x-codex-window-id") .ok_or_else(|| anyhow!("parent request missing x-codex-window-id"))?; - let child_window_id = header(child, "x-codex-window-id") + let child_window_id = child + .header("x-codex-window-id") .ok_or_else(|| anyhow!("child request missing x-codex-window-id"))?; - let (parent_thread_id, parent_generation) = split_window_id(parent_window_id)?; - let (child_thread_id, child_generation) = split_window_id(child_window_id)?; + let (parent_thread_id, parent_generation) = split_window_id(&parent_window_id)?; + let (child_thread_id, child_generation) = split_window_id(&child_window_id)?; assert_eq!(parent_generation, 0); assert_eq!(child_generation, 0); assert!(child_thread_id != parent_thread_id); - assert_eq!(header(parent, "x-openai-subagent"), None); - assert_eq!(header(child, "x-openai-subagent"), Some("collab_spawn")); + assert_eq!(parent.header("x-openai-subagent"), None); + assert_eq!( + child.header("x-openai-subagent").as_deref(), + Some("collab_spawn") + ); assert_eq!( - header(child, "x-codex-parent-thread-id"), + child.header("x-codex-parent-thread-id").as_deref(), Some(parent_thread_id) ); Ok(()) } -async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str, dump_dir: &Path) -> Result<()> { +async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()> { let session_model = test.session_configured.model.clone(); test.codex .submit(Op::UserTurn { @@ -235,14 +152,14 @@ async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str, dump_dir: &Pat }) .await?; - let turn_started = wait_for_event_result(test, "turn started", dump_dir, |event| { + let turn_started = wait_for_event_result(test, "turn started", |event| { matches!(event, EventMsg::TurnStarted(_)) }) .await?; let EventMsg::TurnStarted(turn_started) = turn_started else { unreachable!("event predicate only matches turn started events"); }; - wait_for_event_result(test, "turn complete", dump_dir, |event| match event { + wait_for_event_result(test, "turn complete", |event| match event { EventMsg::TurnComplete(event) => event.turn_id == turn_started.turn_id, _ => false, }) @@ -251,10 +168,33 @@ async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str, dump_dir: &Pat Ok(()) } +async fn wait_for_matching_request( + mock: &ResponseMock, + label: &str, + mut predicate: F, +) -> Result +where + F: FnMut(&ResponsesRequest) -> bool, +{ + tokio::time::timeout(TURN_TIMEOUT, async { + loop { + if let Some(request) = mock + .requests() + .into_iter() + .find(|request| predicate(request)) + { + return request; + } + tokio::time::sleep(REQUEST_POLL_INTERVAL).await; + } + }) + .await + .map_err(|_| anyhow!("timed out waiting for {label}")) +} + async fn wait_for_event_result( test: &TestCodex, stage: &str, - dump_dir: &Path, mut predicate: F, ) -> Result where @@ -273,9 +213,8 @@ where .await .map_err(|_| { anyhow!( - "timed out waiting for {stage}; saw events: {}; {}", - seen_events.join(" | "), - proxy_dump_summary(dump_dir) + "timed out waiting for {stage}; saw events: {}", + seen_events.join(" | ") ) })? } @@ -290,95 +229,8 @@ fn request_body_contains(req: &wiremock::Request, text: &str) -> bool { std::str::from_utf8(&req.body).is_ok_and(|body| body.contains(text)) } -fn wait_for_proxy_request_dumps(dump_dir: &Path) -> Result> { - let deadline = Instant::now() + Duration::from_secs(/*secs*/ 2); - loop { - let dumps = read_proxy_request_dumps(dump_dir).unwrap_or_default(); - if dumps.len() >= 3 - && dumps - .iter() - .any(|dump| dump_body_contains(dump, CHILD_PROMPT)) - { - return Ok(dumps); - } - if Instant::now() >= deadline { - return Err(anyhow!( - "timed out waiting for proxy request dumps, got {}", - dumps.len() - )); - } - std::thread::sleep(PROXY_POLL_INTERVAL); - } -} - -fn read_proxy_request_dumps(dump_dir: &Path) -> Result> { - let mut dumps = Vec::new(); - for entry in std::fs::read_dir(dump_dir)? { - let path = entry?.path(); - if path - .file_name() - .and_then(|name| name.to_str()) - .is_some_and(|name| name.ends_with("-request.json")) - { - let contents = std::fs::read_to_string(&path)?; - if contents.trim().is_empty() { - continue; - } - - match serde_json::from_str(&contents) { - Ok(dump) => dumps.push(dump), - Err(err) if err.is_eof() => continue, - Err(err) => return Err(err.into()), - } - } - } - Ok(dumps) -} - -fn proxy_dump_summary(dump_dir: &Path) -> String { - match read_proxy_request_dumps(dump_dir) { - Ok(dumps) => { - let bodies = dumps - .iter() - .filter_map(|dump| dump.get("body")) - .map(Value::to_string) - .collect::>() - .join("; "); - format!("proxy wrote {} request dumps: {bodies}", dumps.len()) - } - Err(err) => format!("failed to read proxy request dumps: {err}"), - } -} - -#[test] -fn read_proxy_request_dumps_ignores_in_progress_files() -> Result<()> { - let dump_dir = TempDir::new()?; - std::fs::write(dump_dir.path().join("empty-request.json"), "")?; - std::fs::write(dump_dir.path().join("partial-request.json"), "{\"body\"")?; - std::fs::write( - dump_dir.path().join("complete-request.json"), - serde_json::to_string(&json!({ "body": "ready" }))?, - )?; - - assert_eq!( - read_proxy_request_dumps(dump_dir.path())?, - vec![json!({ "body": "ready" })] - ); - - Ok(()) -} - -fn dump_body_contains(dump: &Value, text: &str) -> bool { - dump.get("body") - .is_some_and(|body| body.to_string().contains(text)) -} - -fn header<'a>(dump: &'a Value, name: &str) -> Option<&'a str> { - dump.get("headers")?.as_array()?.iter().find_map(|header| { - (header.get("name")?.as_str()?.eq_ignore_ascii_case(name)) - .then(|| header.get("value")?.as_str()) - .flatten() - }) +fn request_header<'a>(req: &'a wiremock::Request, name: &str) -> Option<&'a str> { + req.headers.get(name).and_then(|value| value.to_str().ok()) } fn split_window_id(window_id: &str) -> Result<(&str, u64)> { diff --git a/codex-rs/responses-api-proxy/src/dump.rs b/codex-rs/responses-api-proxy/src/dump.rs index bebd4ed5925..1a09c64ae7d 100644 --- a/codex-rs/responses-api-proxy/src/dump.rs +++ b/codex-rs/responses-api-proxy/src/dump.rs @@ -231,6 +231,12 @@ mod tests { Header::from_bytes(&b"Cookie"[..], &b"user-session=secret"[..]).expect("cookie header"), Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]) .expect("content-type header"), + Header::from_bytes(&b"x-codex-window-id"[..], &b"thread-1:0"[..]) + .expect("window id header"), + Header::from_bytes(&b"x-codex-parent-thread-id"[..], &b"parent-thread-1"[..]) + .expect("parent thread id header"), + Header::from_bytes(&b"x-openai-subagent"[..], &b"collab_spawn"[..]) + .expect("subagent header"), ]; let exchange_dump = dumper @@ -262,6 +268,18 @@ mod tests { { "name": "Content-Type", "value": "application/json" + }, + { + "name": "x-codex-window-id", + "value": "thread-1:0" + }, + { + "name": "x-codex-parent-thread-id", + "value": "parent-thread-1" + }, + { + "name": "x-openai-subagent", + "value": "collab_spawn" } ], "body": { From 83a6d21b2c135e6e7f9f10247018de08a8555979 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 15 Apr 2026 22:47:33 +0100 Subject: [PATCH 2/2] nit --- codex-rs/core/tests/suite/responses_api_proxy_headers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/core/tests/suite/responses_api_proxy_headers.rs b/codex-rs/core/tests/suite/responses_api_proxy_headers.rs index 2c4519ae749..67f6e86ab0e 100644 --- a/codex-rs/core/tests/suite/responses_api_proxy_headers.rs +++ b/codex-rs/core/tests/suite/responses_api_proxy_headers.rs @@ -56,7 +56,7 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() - |req: &wiremock::Request| { request_body_contains(req, CHILD_PROMPT) && !request_body_contains(req, SPAWN_CALL_ID) - && request_header(req, "x-openai-subagent").as_deref() == Some("collab_spawn") + && request_header(req, "x-openai-subagent") == Some("collab_spawn") }, sse(vec![ ev_response_created("resp-child-1"),