diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 2666f00ecdf..a8d2eabecc0 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -522,6 +522,43 @@ fn rate_limit_error_kind(info: &CodexErrorInfo) -> Option { } } +fn is_transient_turn_failure(codex_error_info: Option<&CodexErrorInfo>, message: &str) -> bool { + match codex_error_info { + Some( + CodexErrorInfo::ServerOverloaded + | CodexErrorInfo::HttpConnectionFailed { .. } + | CodexErrorInfo::ResponseStreamConnectionFailed { .. } + | CodexErrorInfo::InternalServerError + | CodexErrorInfo::ResponseStreamDisconnected { .. } + | CodexErrorInfo::ResponseTooManyFailedAttempts { .. }, + ) => true, + Some( + CodexErrorInfo::ContextWindowExceeded + | CodexErrorInfo::UsageLimitExceeded + | CodexErrorInfo::Unauthorized + | CodexErrorInfo::BadRequest + | CodexErrorInfo::SandboxError + | CodexErrorInfo::ThreadRollbackFailed, + ) => false, + Some(CodexErrorInfo::Other) | None => { + let message = message.to_ascii_lowercase(); + [ + "429", + "too many requests", + "retry limit", + "connection failed", + "temporarily unavailable", + "timeout", + "timed out", + "server overloaded", + "stream disconnected", + ] + .iter() + .any(|pattern| message.contains(pattern)) + } + } +} + #[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] pub(crate) enum ExternalEditorState { #[default] @@ -633,6 +670,10 @@ pub(crate) struct ChatWidget { queued_user_messages: VecDeque, // Monotonic sequence source used when enqueuing `queued_user_messages`. next_queued_message_seq: u64, + // User message most recently submitted to core and still awaiting a terminal turn outcome. + in_flight_user_message: Option, + // Retries the failed in-flight message before advancing queued drafts after transient failures. + retry_current_user_message: Option, // RLPH `/exec` commands are submitted as user shell commands and bridged // back into a follow-up prompt on command completion. pending_rlph_exec_commands: VecDeque, @@ -1626,6 +1667,8 @@ impl ChatWidget { self.last_unified_wait = None; self.unified_exec_wait_streak = None; self.request_redraw(); + self.in_flight_user_message = None; + self.retry_current_user_message = None; let had_pending_steers = !self.pending_steers.is_empty(); self.refresh_pending_input_preview(); @@ -1921,13 +1964,20 @@ impl ChatWidget { self.add_to_history(history_cell::new_warning_event(message)); self.request_redraw(); + self.retry_current_user_message = self.in_flight_user_message.take(); self.maybe_send_next_queued_input(); } - fn on_error(&mut self, message: String) { + fn on_error(&mut self, message: String, retry_current_message: bool) { self.finalize_turn(); self.add_to_history(history_cell::new_error_event(message)); self.request_redraw(); + if retry_current_message { + self.retry_current_user_message = self.in_flight_user_message.take(); + } else { + self.in_flight_user_message = None; + self.retry_current_user_message = None; + } // After an error ends the turn, try sending the next queued input. self.maybe_send_next_queued_input(); @@ -2012,6 +2062,8 @@ impl ChatWidget { fn on_interrupted_turn(&mut self, reason: TurnAbortReason) { // Finalize, log a gentle prompt, and clear running state. self.finalize_turn(); + self.in_flight_user_message = None; + self.retry_current_user_message = None; if reason == TurnAbortReason::Interrupted { self.clear_unified_exec_processes(); } @@ -2187,6 +2239,8 @@ impl ChatWidget { self.queued_user_messages = input_state.pending_steers; self.queued_user_messages .extend(input_state.queued_user_messages); + self.in_flight_user_message = None; + self.retry_current_user_message = None; self.pending_rlph_exec_commands.clear(); self.active_rlph_exec_commands.clear(); } else { @@ -2201,6 +2255,8 @@ impl ChatWidget { ); self.bottom_pane.set_composer_pending_pastes(Vec::new()); self.queued_user_messages.clear(); + self.in_flight_user_message = None; + self.retry_current_user_message = None; self.pending_rlph_exec_commands.clear(); self.active_rlph_exec_commands.clear(); } @@ -3223,6 +3279,8 @@ impl ChatWidget { forked_from: None, queued_user_messages: VecDeque::new(), next_queued_message_seq: 0, + in_flight_user_message: None, + retry_current_user_message: None, pending_rlph_exec_commands: VecDeque::new(), active_rlph_exec_commands: HashMap::new(), pending_steers: VecDeque::new(), @@ -3412,6 +3470,8 @@ impl ChatWidget { plan_item_active: false, queued_user_messages: VecDeque::new(), next_queued_message_seq: 0, + in_flight_user_message: None, + retry_current_user_message: None, pending_rlph_exec_commands: VecDeque::new(), active_rlph_exec_commands: HashMap::new(), pending_steers: VecDeque::new(), @@ -3585,6 +3645,8 @@ impl ChatWidget { forked_from: None, queued_user_messages: VecDeque::new(), next_queued_message_seq: 0, + in_flight_user_message: None, + retry_current_user_message: None, pending_rlph_exec_commands: VecDeque::new(), active_rlph_exec_commands: HashMap::new(), pending_steers: VecDeque::new(), @@ -4483,6 +4545,8 @@ impl ChatWidget { if !self.is_session_configured() || self.bottom_pane.is_task_running() || self.is_review_mode + || self.in_flight_user_message.is_some() + || self.retry_current_user_message.is_some() { self.push_queued_user_message_back(user_message); self.refresh_pending_input_preview(); @@ -4613,6 +4677,10 @@ impl ChatWidget { } fn submit_user_message(&mut self, user_message: UserMessage) { + self.submit_user_message_internal(user_message, false); + } + + fn submit_user_message_internal(&mut self, user_message: UserMessage, is_retry: bool) { if !self.is_session_configured() { tracing::warn!("cannot submit user message before session is configured; queueing"); self.push_queued_user_message_front(user_message); @@ -4625,6 +4693,7 @@ impl ChatWidget { return; } + let submitted_user_message = user_message.clone(); let UserMessage { text, local_images, @@ -4816,6 +4885,13 @@ impl ChatWidget { if !self.submit_op(op) { return; } + self.in_flight_user_message = Some(submitted_user_message); + self.retry_current_user_message = None; + + if is_retry { + self.needs_final_message_separator = false; + return; + } // Persist the text to cross-session message history. if !text.is_empty() { @@ -5016,19 +5092,21 @@ impl ChatWidget { message, codex_error_info, }) => { - if let Some(info) = codex_error_info - && let Some(kind) = rate_limit_error_kind(&info) + let retry_current_message = + is_transient_turn_failure(codex_error_info.as_ref(), &message); + if let Some(info) = codex_error_info.as_ref() + && let Some(kind) = rate_limit_error_kind(info) { match kind { RateLimitErrorKind::ServerOverloaded => { self.on_server_overloaded_error(message) } RateLimitErrorKind::UsageLimit | RateLimitErrorKind::Generic => { - self.on_error(message) + self.on_error(message, retry_current_message) } } } else { - self.on_error(message); + self.on_error(message, retry_current_message); } } EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev), @@ -5040,7 +5118,7 @@ impl ChatWidget { TurnAbortReason::Replaced => { self.pending_steers.clear(); self.refresh_pending_input_preview(); - self.on_error("Turn aborted: replaced by a new task".to_owned()) + self.on_error("Turn aborted: replaced by a new task".to_owned(), false) } TurnAbortReason::ReviewEnded => { self.on_interrupted_turn(ev.reason); @@ -5347,6 +5425,17 @@ impl ChatWidget { if self.suppress_queue_autosend { return; } + if !self.bottom_pane.is_task_running() + && let Some(user_message) = self.retry_current_user_message.take() + { + self.submit_user_message_internal(user_message, true); + self.refresh_pending_input_preview(); + return; + } + if self.in_flight_user_message.is_some() { + self.refresh_pending_input_preview(); + return; + } if self.bottom_pane.is_task_running() && !self .queued_user_messages diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index bdbd3f0d4af..c56c4289920 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -62,6 +62,7 @@ use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::BackgroundEventEvent; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::CreditsSnapshot; +use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::ExecApprovalRequestEvent; @@ -1837,6 +1838,8 @@ async fn make_chatwidget_manual( startup_tooltip_override: None, queued_user_messages: VecDeque::new(), next_queued_message_seq: 0, + in_flight_user_message: None, + retry_current_user_message: None, pending_rlph_exec_commands: VecDeque::new(), active_rlph_exec_commands: HashMap::new(), pending_steers: VecDeque::new(), @@ -7171,6 +7174,287 @@ async fn server_overloaded_error_does_not_switch_models() { } } +#[tokio::test] +async fn transient_retry_limit_error_retries_current_message_before_advancing_queue() { + let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await; + chat.thread_id = Some(ThreadId::new()); + + chat.submit_user_message(UserMessage::from("first".to_string())); + match next_submit_op(&mut op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "first".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected Op::UserTurn, got {other:?}"), + } + + chat.handle_codex_event(Event { + id: "turn-start-1".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + chat.queue_user_message(UserMessage::from("second".to_string())); + + chat.handle_codex_event(Event { + id: "err-1".into(), + msg: EventMsg::Error(ErrorEvent { + message: "exceeded retry limit, last status: 429 Too Many Requests".to_string(), + codex_error_info: Some(CodexErrorInfo::ResponseTooManyFailedAttempts { + http_status_code: Some(429), + }), + }), + }); + + match next_submit_op(&mut op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "first".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected Op::UserTurn retry, got {other:?}"), + } + assert_eq!( + chat.queued_user_messages + .iter() + .map(|message| message.text.clone()) + .collect::>(), + vec!["second".to_string()] + ); + + chat.handle_codex_event(Event { + id: "turn-start-2".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-2".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + chat.handle_codex_event(Event { + id: "turn-complete-2".into(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: "turn-2".to_string(), + last_agent_message: None, + }), + }); + + match next_submit_op(&mut op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "second".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected queued Op::UserTurn, got {other:?}"), + } +} + +#[tokio::test] +async fn transient_connection_error_retries_current_message_before_advancing_queue() { + let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await; + chat.thread_id = Some(ThreadId::new()); + + chat.submit_user_message(UserMessage::from("alpha".to_string())); + let _ = next_submit_op(&mut op_rx); + + chat.handle_codex_event(Event { + id: "turn-start-1".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + chat.queue_user_message(UserMessage::from("beta".to_string())); + + chat.handle_codex_event(Event { + id: "err-1".into(), + msg: EventMsg::Error(ErrorEvent { + message: "connection failed while streaming response".to_string(), + codex_error_info: Some(CodexErrorInfo::HttpConnectionFailed { + http_status_code: Some(502), + }), + }), + }); + + match next_submit_op(&mut op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "alpha".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected Op::UserTurn retry, got {other:?}"), + } + assert_eq!( + chat.queued_user_messages + .iter() + .map(|message| message.text.clone()) + .collect::>(), + vec!["beta".to_string()] + ); +} + +#[tokio::test] +async fn repeated_retry_limit_errors_do_not_advance_queue_during_retry_gap() { + let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await; + chat.thread_id = Some(ThreadId::new()); + + chat.submit_user_message(UserMessage::from("first".to_string())); + let _ = next_submit_op(&mut op_rx); + + chat.handle_codex_event(Event { + id: "turn-start-1".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + chat.queue_user_message(UserMessage::from("second".to_string())); + + chat.handle_codex_event(Event { + id: "err-1".into(), + msg: EventMsg::Error(ErrorEvent { + message: "exceeded retry limit, last status: 429 Too Many Requests".to_string(), + codex_error_info: Some(CodexErrorInfo::ResponseTooManyFailedAttempts { + http_status_code: Some(429), + }), + }), + }); + let _ = next_submit_op(&mut op_rx); + + chat.maybe_send_next_queued_input(); + assert_no_submit_op(&mut op_rx); + assert_eq!( + chat.queued_user_messages + .iter() + .map(|message| message.text.clone()) + .collect::>(), + vec!["second".to_string()] + ); + + chat.handle_codex_event(Event { + id: "err-2".into(), + msg: EventMsg::Error(ErrorEvent { + message: "exceeded retry limit, last status: 429 Too Many Requests".to_string(), + codex_error_info: Some(CodexErrorInfo::ResponseTooManyFailedAttempts { + http_status_code: Some(429), + }), + }), + }); + let _ = next_submit_op(&mut op_rx); + + chat.maybe_send_next_queued_input(); + assert_no_submit_op(&mut op_rx); + assert_eq!( + chat.queued_user_messages + .iter() + .map(|message| message.text.clone()) + .collect::>(), + vec!["second".to_string()] + ); +} + +#[tokio::test] +async fn queue_user_message_during_retry_gap_stays_queued() { + let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await; + chat.thread_id = Some(ThreadId::new()); + + chat.submit_user_message(UserMessage::from("first".to_string())); + let _ = next_submit_op(&mut op_rx); + + chat.handle_codex_event(Event { + id: "turn-start-1".into(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: "turn-1".to_string(), + model_context_window: None, + collaboration_mode_kind: ModeKind::Default, + }), + }); + chat.handle_codex_event(Event { + id: "err-1".into(), + msg: EventMsg::Error(ErrorEvent { + message: "exceeded retry limit, last status: 429 Too Many Requests".to_string(), + codex_error_info: Some(CodexErrorInfo::ResponseTooManyFailedAttempts { + http_status_code: Some(429), + }), + }), + }); + let _ = next_submit_op(&mut op_rx); + + chat.queue_user_message(UserMessage::from("late queued".to_string())); + + assert_no_submit_op(&mut op_rx); + assert_eq!( + chat.queued_user_messages + .iter() + .map(|message| message.text.clone()) + .collect::>(), + vec!["late queued".to_string()] + ); +} + +#[tokio::test] +async fn transient_steer_retry_does_not_duplicate_pending_steers() { + let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await; + chat.thread_id = Some(ThreadId::new()); + + chat.submit_user_message(UserMessage { + text: "retry steer".to_string(), + local_images: Vec::new(), + remote_image_urls: Vec::new(), + text_elements: Vec::new(), + mention_bindings: Vec::new(), + repeat_mode: false, + steer_mode: true, + enqueue_seq: 0, + }); + let _ = next_submit_op(&mut op_rx); + assert_eq!(chat.pending_steers.len(), 1); + + chat.handle_codex_event(Event { + id: "err-1".into(), + msg: EventMsg::Error(ErrorEvent { + message: "response stream disconnected".to_string(), + codex_error_info: Some(CodexErrorInfo::ResponseStreamDisconnected { + http_status_code: Some(502), + }), + }), + }); + + match next_submit_op(&mut op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "retry steer".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected Op::UserTurn retry, got {other:?}"), + } + assert_eq!(chat.pending_steers.len(), 1); + + complete_user_message_for_inputs( + &mut chat, + "user-1", + vec![UserInput::Text { + text: "retry steer".to_string(), + text_elements: Vec::new(), + }], + ); + assert!(chat.pending_steers.is_empty()); +} + #[tokio::test] async fn approvals_selection_popup_snapshot() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;