Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions src/boxlite/src/rest/litebox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ async fn attach_ws_pump(
// Sticky across reconnects: once the server has ever sent a frame the
// exec is real, so a later reconnect uses the steady-state watchdog.
let mut first_frame_seen = false;
// Set once we've already re-attached to drain the backlog after the exec
// went terminal. Bounds that drain to a single attempt so a runner that
// never sends a closing exit frame can't spin this loop.
let mut terminal_drain_attempted = false;

let mut current_stream = Some(initial_stream);

Expand Down Expand Up @@ -681,6 +685,28 @@ async fn attach_ws_pump(
// distinguish "exec really finished" from "transient WS drop".
match probe_execution_status(client, box_id, execution_id).await {
ProbeResult::Terminal(result) => {
// The exec finished, but the WS dropped before we saw its
// exit frame — so the runner may still hold the tail of
// stdout/stderr in its replay backlog that this attach never
// received. This is the silent-loss case over a proxied /
// high-latency link: a fast command exits before the first
// attach drains, the connection is cut, and the status probe
// reports "completed" with the output still buffered
// server-side. Emitting the exit code here would surface a
// clean exit with empty stdout.
//
// Re-attach once, immediately (the data is already there — no
// backoff), so the runner replays its backlog and we leave via
// its authoritative exit frame. Bounded to a single attempt;
// fall back to the probed result if we can't re-attach (reaped
// between probe and connect, or unreachable) so we never hang.
if !terminal_drain_attempted {
terminal_drain_attempted = true;
if let Ok(new_stream) = client.connect_ws(&path).await {
current_stream = Some(new_stream);
continue 'session;
}
}
let _ = result_tx.send(result);
return;
}
Expand Down Expand Up @@ -1374,4 +1400,135 @@ mod tests {
attach.await.unwrap();
server.abort();
}

// ─── ws_terminal_probe_after_cut_must_not_drop_buffered_stdout ───────
//
// Reproduces the cloud "fast command → empty stdout, exit 0" loss.
//
// Timeline of a proxied cloud attach to a command that has already
// finished (e.g. `uptime`):
// 1. The client opens `/attach`. The WS upgrade succeeds, but the
// connection is cut BEFORE the runner flushes the command's stdout
// — an idle-cut / latency stall in the proxy in front of the
// runner. The bytes are NOT gone server-side: they sit in the
// runner's per-exec backlog, replayed to any fresh subscriber.
// 2. The pump probes `GET /executions/{id}` → "completed", exit 0.
// 3. The buffered stdout ("hello\n") is still obtainable by
// re-attaching — the 2nd WS connection below replays it, exactly as
// the runner's stream backlog does for a late/reconnecting client.
//
// Bug: a `ProbeResult::Terminal` short-circuits the pump — it emits the
// exit code and `return`s WITHOUT re-attaching, so the backlog stdout is
// never drained. The consumer sees a clean exit with EMPTY stdout, and
// because the exit code is the correct 0 the drop is silent.
//
// This exercises the REST attach path (`attach_ws_pump` /
// `probe_execution_status`). PR #563 is an `sdks/c` FFI change that does
// not touch this file (`git diff main..HEAD -- src/boxlite/src/rest/\
// litebox.rs` is empty), so this test fails identically on `main` and on
// the #563 branch.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn ws_terminal_probe_after_cut_must_not_drop_buffered_stdout() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let state: SharedState = Arc::new(Mutex::new(ServerState::default()));
let state_clone = state.clone();

// 1st WS upgrade: accept then close immediately, before any output
// frame (the proxy cut). GET status: completed / exit 0. 2nd WS
// upgrade: replay the runner backlog ("hello\n") then the exit frame,
// as a real re-attach would receive. The buggy client never opens
// the 2nd WS — it bails out on the Terminal status probe.
let server = tokio::spawn(async move {
let mut ws_conn = 0u32;
loop {
let (mut stream, _) = match listener.accept().await {
Ok(p) => p,
Err(_) => return,
};
let head = read_request_head(&mut stream).await;
let is_upgrade = String::from_utf8_lossy(&head)
.to_ascii_lowercase()
.contains("upgrade: websocket");
if is_upgrade {
ws_conn += 1;
let chained = ChainedStream {
head,
head_pos: 0,
inner: stream,
};
let mut ws = match tokio_tungstenite::accept_async(chained).await {
Ok(ws) => ws,
Err(_) => continue,
};
if ws_conn == 1 {
// Cut the connection before flushing buffered stdout.
let _ = ws.close(None).await;
} else {
// Re-attach: runner replays its backlog, then exits.
let mut frame = vec![0x01u8];
frame.extend_from_slice(b"hello\n");
let _ = ws.send(Message::Binary(frame)).await;
let _ = ws
.send(Message::Text(r#"{"type":"exit","exit_code":0}"#.into()))
.await;
let _ = ws.close(None).await;
}
} else {
let mut s = state_clone.lock().await;
s.status_calls += 1;
drop(s);
write_status_response(
&mut stream,
r#"{"execution_id":"exec1","status":"completed","exit_code":0}"#,
)
.await;
}
}
});

let client = client_for(port);
let (stdout_tx, mut stdout_rx) = mpsc::unbounded_channel::<String>();
let (stderr_tx, _stderr_rx) = mpsc::unbounded_channel::<String>();
let (_stdin_tx, stdin_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let (result_tx, mut result_rx) = mpsc::unbounded_channel::<ExecResult>();

let attach = tokio::spawn(async move {
attach_ws(
&client, "box1", "exec1", stdin_rx, stdout_tx, stderr_tx, result_tx,
)
.await;
});

// The pump finishes once it emits a terminal result. Bounded so a
// regression that hangs fails loudly instead of stalling CI.
tokio::time::timeout(Duration::from_secs(10), attach)
.await
.expect("attach pump did not finish")
.unwrap();

// Everything the consumer actually received after the pump returned.
let mut collected = String::new();
while let Ok(chunk) = stdout_rx.try_recv() {
collected.push_str(&chunk);
}
let res = result_rx
.try_recv()
.expect("pump emitted no terminal ExecResult");

// The exit code is (correctly) 0 — which is exactly why the loss is
// silent: callers trust the clean exit and never notice the drop.
assert_eq!(res.exit_code, 0, "exit code should surface as 0");

// The real assertion: the command's stdout must reach the consumer.
assert_eq!(
collected, "hello\n",
"buffered stdout was dropped — consumer saw empty stdout with a clean \
exit. The WS pump short-circuited on a Terminal status probe \
(rest/litebox.rs `ProbeResult::Terminal`) and returned without \
re-attaching to drain the runner's backlog."
);

server.abort();
}
}