diff --git a/crates/tui/src/tui/ui.rs b/crates/tui/src/tui/ui.rs index 9aa56b05a..ef3e05d37 100644 --- a/crates/tui/src/tui/ui.rs +++ b/crates/tui/src/tui/ui.rs @@ -2234,6 +2234,49 @@ async fn run_event_loop( } } } + // Detect engine task death mid-turn. When the engine task + // panics (caught by spawn_supervised's catch_unwind) or exits + // unexpectedly between TurnStarted and TurnComplete, the event + // channel's sender is dropped. The while-let loop above exits + // silently on Err, so we must check post-loop and recover the + // UI state immediately instead of waiting for the 300-second + // TURN_STALL_WATCHDOG_TIMEOUT. Use is_closed() rather than + // try_recv() so the probe never consumes a valid event. + if (app.is_loading || app.is_compacting || app.is_purging) && rx.is_closed() { + streaming_thinking::finalize_current(app); + app.finalize_streaming_assistant_as_interrupted(); + app.finalize_active_cell_as_interrupted(); + app.streaming_state.reset(); + app.streaming_message_index = None; + app.streaming_thinking_active_entry = None; + + app.is_loading = false; + app.is_compacting = false; + app.is_purging = false; + app.active_allowed_tools = None; + app.agent_progress.clear(); + app.agent_activity_started_at = None; + app.turn_started_at = None; + app.turn_last_activity_at = None; + app.runtime_turn_status = None; + app.runtime_turn_id = None; + app.dispatch_started_at = None; + app.user_scrolled_during_stream = false; + + // Preserve pending steers: same hard-fail recovery as + // TurnComplete::Failed — surface them in the visible queue + // so they are not silently lost. + for msg in app.drain_pending_steers() { + app.queue_message(msg); + } + + app.push_status_toast( + "Engine process has terminated unexpectedly.", + StatusToastLevel::Error, + None, + ); + app.needs_redraw = true; + } } if let Some(index) = app.streaming_message_index { let committed = app.streaming_state.commit_text(0); diff --git a/crates/tui/src/tui/ui/tests.rs b/crates/tui/src/tui/ui/tests.rs index 04d1dc4cc..78681d1e4 100644 --- a/crates/tui/src/tui/ui/tests.rs +++ b/crates/tui/src/tui/ui/tests.rs @@ -2607,6 +2607,92 @@ fn turn_liveness_recovers_stalled_in_progress_turn() { assert!(toast.text.contains("Turn stalled")); } +#[test] +fn engine_event_channel_disconnect_recovers_mid_turn_ui_state() { + // Simulate the event-loop post-check that detects engine task death. + // Creates a real mpsc channel, sets up app as if a turn is in progress, + // drops the sender (mirroring engine task exit in spawn_supervised), + // and verifies the post-loop recovery logic cleans up the UI state. + let (tx_event, rx) = tokio::sync::mpsc::channel::(256); + drop(tx_event); + + // Confirm the channel is closed + assert!(rx.is_closed()); + + let mut app = create_test_app(); + app.is_loading = true; + app.is_compacting = false; + app.is_purging = false; + app.runtime_turn_status = Some("in_progress".to_string()); + app.runtime_turn_id = Some("turn-42".to_string()); + app.streaming_message_index = Some(0); + app.user_scrolled_during_stream = true; + + // Verify pending steers are surfaced to the visible queue on reconnect + app.pending_steers.push_back(QueuedMessage { + display: "steer content".to_string(), + skill_instruction: None, + }); + assert_eq!(app.pending_steers.len(), 1); + assert_eq!(app.queued_message_count(), 0); + + // Apply the same post-loop logic from ui.rs + if (app.is_loading || app.is_compacting || app.is_purging) && rx.is_closed() { + streaming_thinking::finalize_current(&mut app); + app.finalize_streaming_assistant_as_interrupted(); + app.finalize_active_cell_as_interrupted(); + app.streaming_state.reset(); + app.streaming_message_index = None; + app.streaming_thinking_active_entry = None; + + app.is_loading = false; + app.is_compacting = false; + app.is_purging = false; + app.active_allowed_tools = None; + app.agent_progress.clear(); + app.agent_activity_started_at = None; + app.turn_started_at = None; + app.turn_last_activity_at = None; + app.runtime_turn_status = None; + app.runtime_turn_id = None; + app.dispatch_started_at = None; + app.user_scrolled_during_stream = false; + + for msg in app.drain_pending_steers() { + app.queue_message(msg); + } + + app.push_status_toast( + "Engine process has terminated unexpectedly.", + StatusToastLevel::Error, + None, + ); + app.needs_redraw = true; + } + + // Verify the fix: UI state is fully recovered + assert!(!app.is_loading, "loading must be cleared"); + assert!(!app.is_compacting, "compacting must be cleared"); + assert!(!app.is_purging, "purging must be cleared"); + assert!(app.active_allowed_tools.is_none()); + assert!(app.agent_progress.is_empty()); + assert!(app.agent_activity_started_at.is_none()); + assert!(app.runtime_turn_status.is_none(), "turn status cleared"); + assert!(app.runtime_turn_id.is_none(), "turn id cleared"); + assert!(app.streaming_message_index.is_none()); + assert!(!app.user_scrolled_during_stream); + assert!(app.needs_redraw, "needs_redraw must be set"); + let toast = app.status_toasts.back().expect("error toast pushed"); + assert_eq!(toast.level, StatusToastLevel::Error); + assert!(toast.text.contains("Engine process has terminated")); + + // Verify pending steers were preserved in the visible queue + assert!(app.pending_steers.is_empty(), "pending_steers drained"); + assert_eq!(app.queued_message_count(), 1, "steer requeued"); + let queued = app.pop_queued_message().expect("queued steer available"); + assert_eq!(queued.display, "steer content"); +} + #[test] fn fixed_model_auto_thinking_skips_auto_model_router() { let mut app = create_test_app();