From 3b0e1eca6612b8f06503149963f1bb5db9ebecc2 Mon Sep 17 00:00:00 2001 From: gamnaansong Date: Fri, 29 May 2026 07:57:07 +0000 Subject: [PATCH] fix(rest): re-attach to drain backlog before emitting terminal result MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A fast command over the cloud (Go runner) could return exit 0 with empty stdout. When the WS attach drops before the runner flushes output (a proxy / high-latency cut), attach_ws_pump probed GET /executions/{id}, saw "completed", and emitted the exit code WITHOUT re-attaching — dropping the stdout still sitting in the runner's replay backlog. Local `boxlite serve` masked it (low latency, the exit frame always arrives in-band). On a disconnect-then-Terminal probe, re-attach once (immediately, no backoff) so the runner replays its backlog and we leave via its authoritative exit frame. Bounded to a single attempt with fallback to the probed exit code, so a runner that never sends a closing exit frame can't hang the pump. Independent of #563 (an sdks/c FFI drain fix): this is the REST client path, so it reproduced on both main and #563. Reproducer: ws_terminal_probe_after_cut_must_not_drop_buffered_stdout drives the WS pump against a mock that cuts the first attach, reports completed/0, and serves the backlog only on re-attach. Without the fix it observes empty stdout with a clean exit; with the fix it drains "hello\n". Co-Authored-By: Claude Opus 4.7 (1M context) --- src/boxlite/src/rest/litebox.rs | 157 ++++++++++++++++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/src/boxlite/src/rest/litebox.rs b/src/boxlite/src/rest/litebox.rs index 76114c0ef..e0365f7a6 100644 --- a/src/boxlite/src/rest/litebox.rs +++ b/src/boxlite/src/rest/litebox.rs @@ -557,6 +557,10 @@ async fn attach_ws_pump( // Sticky across reconnects: once the server has ever sent a frame the // exec is real, so a later reconnect uses the steady-state watchdog. let mut first_frame_seen = false; + // Set once we've already re-attached to drain the backlog after the exec + // went terminal. Bounds that drain to a single attempt so a runner that + // never sends a closing exit frame can't spin this loop. + let mut terminal_drain_attempted = false; let mut current_stream = Some(initial_stream); @@ -681,6 +685,28 @@ async fn attach_ws_pump( // distinguish "exec really finished" from "transient WS drop". match probe_execution_status(client, box_id, execution_id).await { ProbeResult::Terminal(result) => { + // The exec finished, but the WS dropped before we saw its + // exit frame — so the runner may still hold the tail of + // stdout/stderr in its replay backlog that this attach never + // received. This is the silent-loss case over a proxied / + // high-latency link: a fast command exits before the first + // attach drains, the connection is cut, and the status probe + // reports "completed" with the output still buffered + // server-side. Emitting the exit code here would surface a + // clean exit with empty stdout. + // + // Re-attach once, immediately (the data is already there — no + // backoff), so the runner replays its backlog and we leave via + // its authoritative exit frame. Bounded to a single attempt; + // fall back to the probed result if we can't re-attach (reaped + // between probe and connect, or unreachable) so we never hang. + if !terminal_drain_attempted { + terminal_drain_attempted = true; + if let Ok(new_stream) = client.connect_ws(&path).await { + current_stream = Some(new_stream); + continue 'session; + } + } let _ = result_tx.send(result); return; } @@ -1374,4 +1400,135 @@ mod tests { attach.await.unwrap(); server.abort(); } + + // ─── ws_terminal_probe_after_cut_must_not_drop_buffered_stdout ─────── + // + // Reproduces the cloud "fast command → empty stdout, exit 0" loss. + // + // Timeline of a proxied cloud attach to a command that has already + // finished (e.g. `uptime`): + // 1. The client opens `/attach`. The WS upgrade succeeds, but the + // connection is cut BEFORE the runner flushes the command's stdout + // — an idle-cut / latency stall in the proxy in front of the + // runner. The bytes are NOT gone server-side: they sit in the + // runner's per-exec backlog, replayed to any fresh subscriber. + // 2. The pump probes `GET /executions/{id}` → "completed", exit 0. + // 3. The buffered stdout ("hello\n") is still obtainable by + // re-attaching — the 2nd WS connection below replays it, exactly as + // the runner's stream backlog does for a late/reconnecting client. + // + // Bug: a `ProbeResult::Terminal` short-circuits the pump — it emits the + // exit code and `return`s WITHOUT re-attaching, so the backlog stdout is + // never drained. The consumer sees a clean exit with EMPTY stdout, and + // because the exit code is the correct 0 the drop is silent. + // + // This exercises the REST attach path (`attach_ws_pump` / + // `probe_execution_status`). PR #563 is an `sdks/c` FFI change that does + // not touch this file (`git diff main..HEAD -- src/boxlite/src/rest/\ + // litebox.rs` is empty), so this test fails identically on `main` and on + // the #563 branch. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn ws_terminal_probe_after_cut_must_not_drop_buffered_stdout() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let state: SharedState = Arc::new(Mutex::new(ServerState::default())); + let state_clone = state.clone(); + + // 1st WS upgrade: accept then close immediately, before any output + // frame (the proxy cut). GET status: completed / exit 0. 2nd WS + // upgrade: replay the runner backlog ("hello\n") then the exit frame, + // as a real re-attach would receive. The buggy client never opens + // the 2nd WS — it bails out on the Terminal status probe. + let server = tokio::spawn(async move { + let mut ws_conn = 0u32; + loop { + let (mut stream, _) = match listener.accept().await { + Ok(p) => p, + Err(_) => return, + }; + let head = read_request_head(&mut stream).await; + let is_upgrade = String::from_utf8_lossy(&head) + .to_ascii_lowercase() + .contains("upgrade: websocket"); + if is_upgrade { + ws_conn += 1; + let chained = ChainedStream { + head, + head_pos: 0, + inner: stream, + }; + let mut ws = match tokio_tungstenite::accept_async(chained).await { + Ok(ws) => ws, + Err(_) => continue, + }; + if ws_conn == 1 { + // Cut the connection before flushing buffered stdout. + let _ = ws.close(None).await; + } else { + // Re-attach: runner replays its backlog, then exits. + let mut frame = vec![0x01u8]; + frame.extend_from_slice(b"hello\n"); + let _ = ws.send(Message::Binary(frame)).await; + let _ = ws + .send(Message::Text(r#"{"type":"exit","exit_code":0}"#.into())) + .await; + let _ = ws.close(None).await; + } + } else { + let mut s = state_clone.lock().await; + s.status_calls += 1; + drop(s); + write_status_response( + &mut stream, + r#"{"execution_id":"exec1","status":"completed","exit_code":0}"#, + ) + .await; + } + } + }); + + let client = client_for(port); + let (stdout_tx, mut stdout_rx) = mpsc::unbounded_channel::(); + let (stderr_tx, _stderr_rx) = mpsc::unbounded_channel::(); + let (_stdin_tx, stdin_rx) = mpsc::unbounded_channel::>(); + let (result_tx, mut result_rx) = mpsc::unbounded_channel::(); + + let attach = tokio::spawn(async move { + attach_ws( + &client, "box1", "exec1", stdin_rx, stdout_tx, stderr_tx, result_tx, + ) + .await; + }); + + // The pump finishes once it emits a terminal result. Bounded so a + // regression that hangs fails loudly instead of stalling CI. + tokio::time::timeout(Duration::from_secs(10), attach) + .await + .expect("attach pump did not finish") + .unwrap(); + + // Everything the consumer actually received after the pump returned. + let mut collected = String::new(); + while let Ok(chunk) = stdout_rx.try_recv() { + collected.push_str(&chunk); + } + let res = result_rx + .try_recv() + .expect("pump emitted no terminal ExecResult"); + + // The exit code is (correctly) 0 — which is exactly why the loss is + // silent: callers trust the clean exit and never notice the drop. + assert_eq!(res.exit_code, 0, "exit code should surface as 0"); + + // The real assertion: the command's stdout must reach the consumer. + assert_eq!( + collected, "hello\n", + "buffered stdout was dropped — consumer saw empty stdout with a clean \ + exit. The WS pump short-circuited on a Terminal status probe \ + (rest/litebox.rs `ProbeResult::Terminal`) and returned without \ + re-attaching to drain the runner's backlog." + ); + + server.abort(); + } }