From 0681926228a7e0a76b4da2819fa4817a35943c34 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Wed, 15 Apr 2026 15:44:53 -0700 Subject: [PATCH] Add pushed exec process events Co-authored-by: Codex --- codex-rs/exec-server/src/client.rs | 37 ++++++++- codex-rs/exec-server/src/lib.rs | 2 + codex-rs/exec-server/src/local_process.rs | 41 +++++++-- codex-rs/exec-server/src/process.rs | 26 ++++++ codex-rs/exec-server/src/remote_process.rs | 6 ++ codex-rs/exec-server/tests/exec_process.rs | 97 +++++++++++++++++++++- 6 files changed, 199 insertions(+), 10 deletions(-) diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 993d2ae018a..92afe90fdb4 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -6,6 +6,7 @@ use arc_swap::ArcSwap; use codex_app_server_protocol::JSONRPCNotification; use serde_json::Value; use tokio::sync::Mutex; +use tokio::sync::broadcast; use tokio::sync::watch; use tokio::time::timeout; @@ -16,6 +17,7 @@ use crate::ProcessId; use crate::client_api::ExecServerClientConnectOptions; use crate::client_api::RemoteExecServerConnectArgs; use crate::connection::JsonRpcConnection; +use crate::process::ExecProcessEvent; use crate::protocol::EXEC_CLOSED_METHOD; use crate::protocol::EXEC_EXITED_METHOD; use crate::protocol::EXEC_METHOD; @@ -53,6 +55,7 @@ use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; use crate::protocol::InitializeResponse; +use crate::protocol::ProcessOutputChunk; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; use crate::protocol::TerminateParams; @@ -65,6 +68,7 @@ use crate::rpc::RpcClientEvent; const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10); +const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256; impl Default for ExecServerClientConnectOptions { fn default() -> Self { @@ -100,6 +104,7 @@ impl RemoteExecServerConnectArgs { pub(crate) struct SessionState { wake_tx: watch::Sender, + event_tx: broadcast::Sender, failure: Mutex>, } @@ -450,8 +455,10 @@ impl From for ExecServerError { impl SessionState { fn new() -> Self { let (wake_tx, _wake_rx) = watch::channel(0); + let (event_tx, _event_rx) = broadcast::channel(PROCESS_EVENT_CHANNEL_CAPACITY); Self { wake_tx, + event_tx, failure: Mutex::new(None), } } @@ -460,19 +467,31 @@ impl SessionState { self.wake_tx.subscribe() } + pub(crate) fn subscribe_events(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } + fn note_change(&self, seq: u64) { let next = (*self.wake_tx.borrow()).max(seq); let _ = self.wake_tx.send(next); } + fn publish_event(&self, event: ExecProcessEvent) { + let _ = self.event_tx.send(event); + } + async fn set_failure(&self, message: String) { let mut failure = self.failure.lock().await; - if failure.is_none() { - *failure = Some(message); + let should_publish = failure.is_none(); + if should_publish { + *failure = Some(message.clone()); } drop(failure); let next = (*self.wake_tx.borrow()).saturating_add(1); let _ = self.wake_tx.send(next); + if should_publish { + self.publish_event(ExecProcessEvent::Failed(message)); + } } async fn failed_response(&self) -> Option { @@ -505,6 +524,10 @@ impl Session { self.state.subscribe() } + pub(crate) fn subscribe_events(&self) -> broadcast::Receiver { + self.state.subscribe_events() + } + pub(crate) async fn read( &self, after_seq: Option, @@ -628,6 +651,11 @@ async fn handle_server_notification( serde_json::from_value(notification.params.unwrap_or(Value::Null))?; if let Some(session) = inner.get_session(¶ms.process_id) { session.note_change(params.seq); + session.publish_event(ExecProcessEvent::Output(ProcessOutputChunk { + seq: params.seq, + stream: params.stream, + chunk: params.chunk, + })); } } EXEC_EXITED_METHOD => { @@ -635,6 +663,10 @@ async fn handle_server_notification( serde_json::from_value(notification.params.unwrap_or(Value::Null))?; if let Some(session) = inner.get_session(¶ms.process_id) { session.note_change(params.seq); + session.publish_event(ExecProcessEvent::Exited { + seq: params.seq, + exit_code: params.exit_code, + }); } } EXEC_CLOSED_METHOD => { @@ -645,6 +677,7 @@ async fn handle_server_notification( let session = inner.remove_session(¶ms.process_id).await; if let Some(session) = session { session.note_change(params.seq); + session.publish_event(ExecProcessEvent::Closed { seq: params.seq }); } } other => { diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index e7f4cc0e0c9..9d7e7a89424 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -39,6 +39,7 @@ pub use local_file_system::LOCAL_FS; pub use local_file_system::LocalFileSystem; pub use process::ExecBackend; pub use process::ExecProcess; +pub use process::ExecProcessEvent; pub use process::StartedExecProcess; pub use process_id::ProcessId; pub use protocol::ExecClosedNotification; @@ -66,6 +67,7 @@ pub use protocol::FsWriteFileParams; pub use protocol::FsWriteFileResponse; pub use protocol::InitializeParams; pub use protocol::InitializeResponse; +pub use protocol::ProcessOutputChunk; pub use protocol::ReadParams; pub use protocol::ReadResponse; pub use protocol::TerminateParams; diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index 50678436db0..b69a826aae5 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -12,11 +12,13 @@ use codex_utils_pty::ExecCommandSession; use codex_utils_pty::TerminalSize; use tokio::sync::Mutex; use tokio::sync::Notify; +use tokio::sync::broadcast; use tokio::sync::mpsc; use tokio::sync::watch; use crate::ExecBackend; use crate::ExecProcess; +use crate::ExecProcessEvent; use crate::ExecServerError; use crate::ProcessId; use crate::StartedExecProcess; @@ -45,6 +47,7 @@ use crate::rpc::invalid_request; const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; const NOTIFICATION_CHANNEL_CAPACITY: usize = 256; +const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256; #[cfg(test)] const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25); #[cfg(not(test))] @@ -66,6 +69,7 @@ struct RunningProcess { next_seq: u64, exit_code: Option, wake_tx: watch::Sender, + event_tx: broadcast::Sender, output_notify: Arc, open_streams: usize, closed: bool, @@ -90,6 +94,7 @@ struct LocalExecProcess { process_id: ProcessId, backend: LocalProcess, wake_tx: watch::Sender, + event_tx: broadcast::Sender, } impl Default for LocalProcess { @@ -139,7 +144,14 @@ impl LocalProcess { async fn start_process( &self, params: ExecParams, - ) -> Result<(ExecResponse, watch::Sender), JSONRPCErrorError> { + ) -> Result< + ( + ExecResponse, + watch::Sender, + broadcast::Sender, + ), + JSONRPCErrorError, + > { let process_id = params.process_id.clone(); let (program, args) = params .argv @@ -199,6 +211,7 @@ impl LocalProcess { let output_notify = Arc::new(Notify::new()); let (wake_tx, _wake_rx) = watch::channel(0); + let (event_tx, _event_rx) = broadcast::channel(PROCESS_EVENT_CHANNEL_CAPACITY); { let mut process_map = self.inner.processes.lock().await; process_map.insert( @@ -212,6 +225,7 @@ impl LocalProcess { next_seq: 1, exit_code: None, wake_tx: wake_tx.clone(), + event_tx: event_tx.clone(), output_notify: Arc::clone(&output_notify), open_streams: 2, closed: false, @@ -248,13 +262,13 @@ impl LocalProcess { output_notify, )); - Ok((ExecResponse { process_id }, wake_tx)) + Ok((ExecResponse { process_id }, wake_tx, event_tx)) } pub(crate) async fn exec(&self, params: ExecParams) -> Result { self.start_process(params) .await - .map(|(response, _)| response) + .map(|(response, _, _)| response) } pub(crate) async fn exec_read( @@ -425,7 +439,7 @@ fn shell_environment_policy(env_policy: &ExecEnvPolicy) -> ShellEnvironmentPolic #[async_trait] impl ExecBackend for LocalProcess { async fn start(&self, params: ExecParams) -> Result { - let (response, wake_tx) = self + let (response, wake_tx, event_tx) = self .start_process(params) .await .map_err(map_handler_error)?; @@ -434,6 +448,7 @@ impl ExecBackend for LocalProcess { process_id: response.process_id, backend: self.clone(), wake_tx, + event_tx, }), }) } @@ -449,6 +464,10 @@ impl ExecProcess for LocalExecProcess { self.wake_tx.subscribe() } + fn subscribe_events(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } + async fn read( &self, after_seq: Option, @@ -549,11 +568,19 @@ async fn stream_output( process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len()); } let _ = process.wake_tx.send(seq); + let output = ProcessOutputChunk { + seq, + stream, + chunk: chunk.into(), + }; + let _ = process + .event_tx + .send(ExecProcessEvent::Output(output.clone())); ExecOutputDeltaNotification { process_id: process_id.clone(), seq, stream, - chunk: chunk.into(), + chunk: output.chunk, } }; output_notify.notify_waiters(); @@ -581,6 +608,9 @@ async fn watch_exit( process.next_seq += 1; process.exit_code = Some(exit_code); let _ = process.wake_tx.send(seq); + let _ = process + .event_tx + .send(ExecProcessEvent::Exited { seq, exit_code }); Some(ExecExitedNotification { process_id: process_id.clone(), seq, @@ -641,6 +671,7 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc) { let seq = process.next_seq; process.next_seq += 1; let _ = process.wake_tx.send(seq); + let _ = process.event_tx.send(ExecProcessEvent::Closed { seq }); Some(ExecClosedNotification { process_id: process_id.clone(), seq, diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index e455c77bdc6..90691b167bb 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; +use tokio::sync::broadcast; use tokio::sync::watch; use crate::ExecServerError; use crate::ProcessId; use crate::protocol::ExecParams; +use crate::protocol::ProcessOutputChunk; use crate::protocol::ReadResponse; use crate::protocol::WriteResponse; @@ -13,12 +15,36 @@ pub struct StartedExecProcess { pub process: Arc, } +/// Pushed process events for consumers that want to follow process output as it +/// arrives instead of polling retained output with [`ExecProcess::read`]. +/// +/// The stream is scoped to one [`ExecProcess`] handle. `Output` events carry +/// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while +/// `Closed` means all output streams have ended and no more output events will +/// arrive. `Failed` is used when the process session cannot continue, for +/// example because the remote executor connection disconnected. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExecProcessEvent { + Output(ProcessOutputChunk), + Exited { seq: u64, exit_code: i32 }, + Closed { seq: u64 }, + Failed(String), +} + +/// Handle for an executor-managed process. +/// +/// Implementations must support both retained-output reads and pushed events: +/// `read` is the request/response API for callers that want to page through +/// buffered output, while `subscribe_events` is the streaming API for callers +/// that want output and lifecycle changes delivered as they happen. #[async_trait] pub trait ExecProcess: Send + Sync { fn process_id(&self) -> &ProcessId; fn subscribe_wake(&self) -> watch::Receiver; + fn subscribe_events(&self) -> broadcast::Receiver; + async fn read( &self, after_seq: Option, diff --git a/codex-rs/exec-server/src/remote_process.rs b/codex-rs/exec-server/src/remote_process.rs index d3163b475ec..6246d4485dd 100644 --- a/codex-rs/exec-server/src/remote_process.rs +++ b/codex-rs/exec-server/src/remote_process.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; +use tokio::sync::broadcast; use tokio::sync::watch; use tracing::trace; use crate::ExecBackend; use crate::ExecProcess; +use crate::ExecProcessEvent; use crate::ExecServerError; use crate::StartedExecProcess; use crate::client::ExecServerClient; @@ -56,6 +58,10 @@ impl ExecProcess for RemoteExecProcess { self.session.subscribe_wake() } + fn subscribe_events(&self) -> broadcast::Receiver { + self.session.subscribe_events() + } + async fn read( &self, after_seq: Option, diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index f3f6d24dcd1..dd12343d694 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -7,8 +7,10 @@ use std::sync::Arc; use anyhow::Result; use codex_exec_server::Environment; use codex_exec_server::ExecBackend; +use codex_exec_server::ExecOutputStream; use codex_exec_server::ExecParams; use codex_exec_server::ExecProcess; +use codex_exec_server::ExecProcessEvent; use codex_exec_server::ExecStdinMode; use codex_exec_server::ProcessId; use codex_exec_server::ReadResponse; @@ -117,6 +119,40 @@ async fn collect_process_output_from_reads( Ok((output, exit_code, true)) } +async fn collect_process_output_from_events( + session: Arc, +) -> Result<(String, String, Option, bool)> { + let mut events = session.subscribe_events(); + let mut stdout = String::new(); + let mut stderr = String::new(); + let mut exit_code = None; + loop { + match timeout(Duration::from_secs(2), events.recv()).await?? { + ExecProcessEvent::Output(chunk) => match chunk.stream { + ExecOutputStream::Stdout | ExecOutputStream::Pty => { + stdout.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner())); + } + ExecOutputStream::Stderr => { + stderr.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner())); + } + }, + ExecProcessEvent::Exited { + seq: _, + exit_code: code, + } => { + exit_code = Some(code); + } + ExecProcessEvent::Closed { seq: _ } => { + drop(session); + return Ok((stdout, stderr, exit_code, true)); + } + ExecProcessEvent::Failed(message) => { + anyhow::bail!("process failed before closed state: {message}"); + } + } + } +} + async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> { let context = create_process_context(use_remote).await?; let process_id = "proc-stream".to_string(); @@ -148,6 +184,42 @@ async fn assert_exec_process_streams_output(use_remote: bool) -> Result<()> { Ok(()) } +async fn assert_exec_process_pushes_events(use_remote: bool) -> Result<()> { + let context = create_process_context(use_remote).await?; + let process_id = "proc-events".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "sleep 0.05; printf 'event output\\n'; printf 'event err\\n' >&2".to_string(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + stdin: ExecStdinMode::Closed, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + let StartedExecProcess { process } = session; + let actual = collect_process_output_from_events(process).await?; + assert_eq!( + actual, + ( + "event output\n".to_string(), + "event err\n".to_string(), + Some(0), + true + ) + ); + Ok(()) +} + async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> { let context = create_process_context(use_remote).await?; let process_id = "proc-stdin".to_string(); @@ -238,15 +310,25 @@ async fn remote_exec_process_reports_transport_disconnect() -> Result<()> { }) .await?; + let process = Arc::clone(&session.process); + let mut events = process.subscribe_events(); let server = context .server .as_mut() .expect("remote context should include exec-server harness"); server.shutdown().await?; - let mut wake_rx = session.process.subscribe_wake(); - let response = - read_process_until_change(session.process, &mut wake_rx, /*after_seq*/ None).await?; + let event = timeout(Duration::from_secs(2), events.recv()).await??; + let ExecProcessEvent::Failed(event_message) = event else { + anyhow::bail!("expected process failure event, got {event:?}"); + }; + assert!( + event_message.starts_with("exec-server transport disconnected"), + "unexpected failure event: {event_message}" + ); + + let mut wake_rx = process.subscribe_wake(); + let response = read_process_until_change(process, &mut wake_rx, /*after_seq*/ None).await?; let message = response .failure .expect("disconnect should surface as a failure"); @@ -280,6 +362,15 @@ async fn exec_process_streams_output(use_remote: bool) -> Result<()> { assert_exec_process_streams_output(use_remote).await } +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_pushes_events(use_remote: bool) -> Result<()> { + assert_exec_process_pushes_events(use_remote).await +} + #[test_case(false ; "local")] #[test_case(true ; "remote")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]