Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions codex-rs/exec-server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arc_swap::ArcSwap;
use codex_app_server_protocol::JSONRPCNotification;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::watch;

use tokio::time::timeout;
Expand All @@ -16,6 +17,7 @@ use crate::ProcessId;
use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecProcessEvent;
use crate::protocol::EXEC_CLOSED_METHOD;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
Expand Down Expand Up @@ -53,6 +55,7 @@ use crate::protocol::INITIALIZE_METHOD;
use crate::protocol::INITIALIZED_METHOD;
use crate::protocol::InitializeParams;
use crate::protocol::InitializeResponse;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
Expand All @@ -65,6 +68,7 @@ use crate::rpc::RpcClientEvent;

const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;

impl Default for ExecServerClientConnectOptions {
fn default() -> Self {
Expand Down Expand Up @@ -100,6 +104,7 @@ impl RemoteExecServerConnectArgs {

pub(crate) struct SessionState {
wake_tx: watch::Sender<u64>,
event_tx: broadcast::Sender<ExecProcessEvent>,
failure: Mutex<Option<String>>,
}

Expand Down Expand Up @@ -450,8 +455,10 @@ impl From<RpcCallError> for ExecServerError {
impl SessionState {
fn new() -> Self {
let (wake_tx, _wake_rx) = watch::channel(0);
let (event_tx, _event_rx) = broadcast::channel(PROCESS_EVENT_CHANNEL_CAPACITY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Buffer events until first subscriber attaches

SessionState::new creates a broadcast channel and immediately drops the initial receiver. If a short-lived process outputs/exits before subscribe_events() is called, publish_event drops those events (no receivers), yet the sender remains alive on the process handle. An event-only consumer can then wait forever for Exited/Closed, which is a functional hang.

Useful? React with 👍 / 👎.

Self {
wake_tx,
event_tx,
failure: Mutex::new(None),
}
}
Expand All @@ -460,19 +467,31 @@ impl SessionState {
self.wake_tx.subscribe()
}

pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
self.event_tx.subscribe()
}

fn note_change(&self, seq: u64) {
let next = (*self.wake_tx.borrow()).max(seq);
let _ = self.wake_tx.send(next);
}

fn publish_event(&self, event: ExecProcessEvent) {
let _ = self.event_tx.send(event);
}

async fn set_failure(&self, message: String) {
let mut failure = self.failure.lock().await;
if failure.is_none() {
*failure = Some(message);
let should_publish = failure.is_none();
if should_publish {
*failure = Some(message.clone());
}
drop(failure);
let next = (*self.wake_tx.borrow()).saturating_add(1);
let _ = self.wake_tx.send(next);
if should_publish {
self.publish_event(ExecProcessEvent::Failed(message));
}
}

async fn failed_response(&self) -> Option<ReadResponse> {
Expand Down Expand Up @@ -505,6 +524,10 @@ impl Session {
self.state.subscribe()
}

pub(crate) fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
self.state.subscribe_events()
}

pub(crate) async fn read(
&self,
after_seq: Option<u64>,
Expand Down Expand Up @@ -628,13 +651,22 @@ async fn handle_server_notification(
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
if let Some(session) = inner.get_session(&params.process_id) {
session.note_change(params.seq);
session.publish_event(ExecProcessEvent::Output(ProcessOutputChunk {
seq: params.seq,
stream: params.stream,
chunk: params.chunk,
}));
}
}
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
if let Some(session) = inner.get_session(&params.process_id) {
session.note_change(params.seq);
session.publish_event(ExecProcessEvent::Exited {
seq: params.seq,
exit_code: params.exit_code,
});
}
}
EXEC_CLOSED_METHOD => {
Expand All @@ -645,6 +677,7 @@ async fn handle_server_notification(
let session = inner.remove_session(&params.process_id).await;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep session routed until ordered terminal events arrive

On exec/closed, the client removes the session before forwarding the event. Server notifications are emitted from concurrent tasks, so closed can arrive before lower-seq exited/output notifications. After removal, those delayed notifications are dropped, so remote event subscribers can miss exit status or tail output.

Useful? React with 👍 / 👎.

if let Some(session) = session {
session.note_change(params.seq);
session.publish_event(ExecProcessEvent::Closed { seq: params.seq });
}
}
other => {
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/exec-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub use local_file_system::LOCAL_FS;
pub use local_file_system::LocalFileSystem;
pub use process::ExecBackend;
pub use process::ExecProcess;
pub use process::ExecProcessEvent;
pub use process::StartedExecProcess;
pub use process_id::ProcessId;
pub use protocol::ExecClosedNotification;
Expand Down Expand Up @@ -66,6 +67,7 @@ pub use protocol::FsWriteFileParams;
pub use protocol::FsWriteFileResponse;
pub use protocol::InitializeParams;
pub use protocol::InitializeResponse;
pub use protocol::ProcessOutputChunk;
pub use protocol::ReadParams;
pub use protocol::ReadResponse;
pub use protocol::TerminateParams;
Expand Down
41 changes: 36 additions & 5 deletions codex-rs/exec-server/src/local_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use codex_utils_pty::ExecCommandSession;
use codex_utils_pty::TerminalSize;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::watch;

use crate::ExecBackend;
use crate::ExecProcess;
use crate::ExecProcessEvent;
use crate::ExecServerError;
use crate::ProcessId;
use crate::StartedExecProcess;
Expand Down Expand Up @@ -45,6 +47,7 @@ use crate::rpc::invalid_request;

const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024;
const NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;
#[cfg(test)]
const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25);
#[cfg(not(test))]
Expand All @@ -66,6 +69,7 @@ struct RunningProcess {
next_seq: u64,
exit_code: Option<i32>,
wake_tx: watch::Sender<u64>,
event_tx: broadcast::Sender<ExecProcessEvent>,
output_notify: Arc<Notify>,
open_streams: usize,
closed: bool,
Expand All @@ -90,6 +94,7 @@ struct LocalExecProcess {
process_id: ProcessId,
backend: LocalProcess,
wake_tx: watch::Sender<u64>,
event_tx: broadcast::Sender<ExecProcessEvent>,
}

impl Default for LocalProcess {
Expand Down Expand Up @@ -139,7 +144,14 @@ impl LocalProcess {
async fn start_process(
&self,
params: ExecParams,
) -> Result<(ExecResponse, watch::Sender<u64>), JSONRPCErrorError> {
) -> Result<
(
ExecResponse,
watch::Sender<u64>,
broadcast::Sender<ExecProcessEvent>,
),
JSONRPCErrorError,
> {
let process_id = params.process_id.clone();
let (program, args) = params
.argv
Expand Down Expand Up @@ -199,6 +211,7 @@ impl LocalProcess {

let output_notify = Arc::new(Notify::new());
let (wake_tx, _wake_rx) = watch::channel(0);
let (event_tx, _event_rx) = broadcast::channel(PROCESS_EVENT_CHANNEL_CAPACITY);
{
let mut process_map = self.inner.processes.lock().await;
process_map.insert(
Expand All @@ -212,6 +225,7 @@ impl LocalProcess {
next_seq: 1,
exit_code: None,
wake_tx: wake_tx.clone(),
event_tx: event_tx.clone(),
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
Expand Down Expand Up @@ -248,13 +262,13 @@ impl LocalProcess {
output_notify,
));

Ok((ExecResponse { process_id }, wake_tx))
Ok((ExecResponse { process_id }, wake_tx, event_tx))
}

pub(crate) async fn exec(&self, params: ExecParams) -> Result<ExecResponse, JSONRPCErrorError> {
self.start_process(params)
.await
.map(|(response, _)| response)
.map(|(response, _, _)| response)
}

pub(crate) async fn exec_read(
Expand Down Expand Up @@ -425,7 +439,7 @@ fn shell_environment_policy(env_policy: &ExecEnvPolicy) -> ShellEnvironmentPolic
#[async_trait]
impl ExecBackend for LocalProcess {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
let (response, wake_tx) = self
let (response, wake_tx, event_tx) = self
.start_process(params)
.await
.map_err(map_handler_error)?;
Expand All @@ -434,6 +448,7 @@ impl ExecBackend for LocalProcess {
process_id: response.process_id,
backend: self.clone(),
wake_tx,
event_tx,
}),
})
}
Expand All @@ -449,6 +464,10 @@ impl ExecProcess for LocalExecProcess {
self.wake_tx.subscribe()
}

fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
self.event_tx.subscribe()
}

async fn read(
&self,
after_seq: Option<u64>,
Expand Down Expand Up @@ -549,11 +568,19 @@ async fn stream_output(
process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len());
}
let _ = process.wake_tx.send(seq);
let output = ProcessOutputChunk {
seq,
stream,
chunk: chunk.into(),
};
let _ = process
.event_tx
.send(ExecProcessEvent::Output(output.clone()));
ExecOutputDeltaNotification {
process_id: process_id.clone(),
seq,
stream,
chunk: chunk.into(),
chunk: output.chunk,
}
};
output_notify.notify_waiters();
Expand Down Expand Up @@ -581,6 +608,9 @@ async fn watch_exit(
process.next_seq += 1;
process.exit_code = Some(exit_code);
let _ = process.wake_tx.send(seq);
let _ = process
.event_tx
.send(ExecProcessEvent::Exited { seq, exit_code });
Some(ExecExitedNotification {
process_id: process_id.clone(),
seq,
Expand Down Expand Up @@ -641,6 +671,7 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
let seq = process.next_seq;
process.next_seq += 1;
let _ = process.wake_tx.send(seq);
let _ = process.event_tx.send(ExecProcessEvent::Closed { seq });
Some(ExecClosedNotification {
process_id: process_id.clone(),
seq,
Expand Down
26 changes: 26 additions & 0 deletions codex-rs/exec-server/src/process.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,50 @@
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::broadcast;
use tokio::sync::watch;

use crate::ExecServerError;
use crate::ProcessId;
use crate::protocol::ExecParams;
use crate::protocol::ProcessOutputChunk;
use crate::protocol::ReadResponse;
use crate::protocol::WriteResponse;

pub struct StartedExecProcess {
pub process: Arc<dyn ExecProcess>,
}

/// Pushed process events for consumers that want to follow process output as it
/// arrives instead of polling retained output with [`ExecProcess::read`].
///
/// The stream is scoped to one [`ExecProcess`] handle. `Output` events carry
/// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while
/// `Closed` means all output streams have ended and no more output events will
/// arrive. `Failed` is used when the process session cannot continue, for
/// example because the remote executor connection disconnected.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecProcessEvent {
Output(ProcessOutputChunk),
Exited { seq: u64, exit_code: i32 },
Closed { seq: u64 },
Failed(String),
}

/// Handle for an executor-managed process.
///
/// Implementations must support both retained-output reads and pushed events:
/// `read` is the request/response API for callers that want to page through
/// buffered output, while `subscribe_events` is the streaming API for callers
/// that want output and lifecycle changes delivered as they happen.
#[async_trait]
pub trait ExecProcess: Send + Sync {
fn process_id(&self) -> &ProcessId;

fn subscribe_wake(&self) -> watch::Receiver<u64>;

fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent>;

async fn read(
&self,
after_seq: Option<u64>,
Expand Down
6 changes: 6 additions & 0 deletions codex-rs/exec-server/src/remote_process.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::broadcast;
use tokio::sync::watch;
use tracing::trace;

use crate::ExecBackend;
use crate::ExecProcess;
use crate::ExecProcessEvent;
use crate::ExecServerError;
use crate::StartedExecProcess;
use crate::client::ExecServerClient;
Expand Down Expand Up @@ -56,6 +58,10 @@ impl ExecProcess for RemoteExecProcess {
self.session.subscribe_wake()
}

fn subscribe_events(&self) -> broadcast::Receiver<ExecProcessEvent> {
self.session.subscribe_events()
}

async fn read(
&self,
after_seq: Option<u64>,
Expand Down
Loading
Loading