Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 112 additions & 12 deletions src-tauri/src/assistant/local_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,56 @@ struct MidRunDelivery {
interrupted: bool,
}

/// Slot accounting for mid-run injected turns (Mechanism B): how many
/// delivered messages still owe a completed turn, and whether the original
/// prompt turn has ended.
///
/// The invariant behind `turn_ended`: Claude Code emits exactly one `result`
/// per turn, and turns run strictly in order — the original prompt turn
/// first, then one turn per injected user line. So the FIRST result always
/// belongs to the original turn (which never held a slot), and every later
/// result completes an injected turn and frees its slot. That holds even for
/// the wind-down `result` of a turn we interrupted on purpose: the turn that
/// got cut may itself be an injected one (a second queued message arriving
/// while the first one's turn runs). An earlier version skipped the decrement
/// for every interrupted wind-down, leaving the count one too high in that
/// case — the run then waited forever for a turn that was never coming
/// (stdin held open, no EOF: a zombie run with an eternal spinner).
struct InjectedTurnLedger {
pending: usize,
original_turn_completed: bool,
}

impl InjectedTurnLedger {
fn new() -> Self {
Self {
pending: 0,
original_turn_completed: false,
}
}

/// Record `count` queued messages handed to the live process; each owes
/// one future turn.
fn delivered(&mut self, count: usize) {
self.pending += count;
}

/// Record a `result` event ending the current turn (normally or via our
/// interrupt) and return how many injected turns are still owed.
fn turn_ended(&mut self) -> usize {
if self.original_turn_completed {
self.pending = self.pending.saturating_sub(1);
}
self.original_turn_completed = true;
self.pending
}

/// Injected turns still owed; stdin must stay open while any remain.
fn pending(&self) -> usize {
self.pending
}
}

/// Best-effort delivery of pending queued messages into a live Claude Code
/// process (Mechanism B). When a turn is in flight, an interrupt control
/// request is written first so the agent winds down and re-plans with the
Expand Down Expand Up @@ -950,11 +1000,11 @@ async fn run_claude_turn(
// → its `result`) so delivery knows whether an interrupt is needed.
// `awaiting_interrupt_result` marks that the next error_during_execution
// result is OUR interrupt winding the turn down, not a failure.
// `pending_injected_turns` counts delivered messages whose turns have
// not yet completed — stdin must stay open until it reaches zero.
// `injected_turns` tracks delivered messages whose turns have not yet
// completed — stdin must stay open while any remain.
let mut turn_active = false;
let mut awaiting_interrupt_result = false;
let mut pending_injected_turns: usize = 0;
let mut injected_turns = InjectedTurnLedger::new();
let mut queue_poll = tokio::time::interval(std::time::Duration::from_millis(1200));
queue_poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

Expand Down Expand Up @@ -982,7 +1032,7 @@ async fn run_claude_turn(
)
.await;
if outcome.delivered > 0 {
pending_injected_turns += outcome.delivered;
injected_turns.delivered(outcome.delivered);
if outcome.interrupted {
awaiting_interrupt_result = true;
}
Expand Down Expand Up @@ -1041,16 +1091,17 @@ async fn run_claude_turn(

if event_type == Some("result") {
awaiting_interrupt_result = false;
// Every `result` ends exactly one turn, interrupted or not, and
// the ledger frees the slot that turn held (if any). The
// wind-down of a turn we cut on purpose is no exception: the
// turn that got cut may itself have been an injected one — see
// `InjectedTurnLedger::turn_ended` for the full story.
injected_turns.turn_ended();
if live_stdin.is_some() {
// The interrupted turn's wind-down doesn't consume an
// injected-turn slot; every other result completes a turn.
if !suppress_interrupted {
pending_injected_turns = pending_injected_turns.saturating_sub(1);
}
// End-of-run race: a message may have been queued since the
// last poll tick. Fold it in as one more turn now instead of
// closing; if delivery declines, the followup run has it.
if pending_injected_turns == 0 {
if injected_turns.pending() == 0 {
if let Some(stdin) = live_stdin.as_mut() {
let outcome = try_deliver_queued_to_claude(
deps,
Expand All @@ -1061,10 +1112,10 @@ async fn run_claude_turn(
false,
)
.await;
pending_injected_turns += outcome.delivered;
injected_turns.delivered(outcome.delivered);
}
}
if pending_injected_turns > 0 {
if injected_turns.pending() > 0 {
// More turns are coming in this process. Close out the
// current assistant bubble so the injected user message
// and the upcoming reply render in conversation order.
Expand Down Expand Up @@ -3683,6 +3734,55 @@ mod tests {
assert!(!is_interrupted_turn_result(&other));
}

#[test]
fn injected_turn_ledger_double_interrupt_frees_the_cut_turns_slot() {
// Regression: a second queued message interrupting the FIRST
// injected message's turn. The interrupted wind-down used to skip
// the decrement unconditionally, leaving the count one turn over —
// the run then held stdin open waiting for a result that was never
// coming (zombie run / eternal spinner, observed 2026-06-11).
let mut ledger = InjectedTurnLedger::new();

// Message #1 interrupts the original prompt turn.
ledger.delivered(1);
// Wind-down of the ORIGINAL turn: held no slot, nothing freed.
assert_eq!(ledger.turn_ended(), 1);

// Message #2 interrupts injected turn #1 while it is running.
ledger.delivered(1);
// Wind-down of injected turn #1: ITS slot must be freed.
assert_eq!(ledger.turn_ended(), 1);

// Injected turn #2 completes: all slots drained, stdin may close.
assert_eq!(ledger.turn_ended(), 0);
}

#[test]
fn injected_turn_ledger_single_interrupt() {
let mut ledger = InjectedTurnLedger::new();
ledger.delivered(1); // interrupts the original turn
assert_eq!(ledger.turn_ended(), 1); // original wind-down: no slot
assert_eq!(ledger.turn_ended(), 0); // injected turn done
}

#[test]
fn injected_turn_ledger_delivery_before_first_turn_event() {
// A poll tick can deliver before the original turn streams its
// first event (no interrupt sent, the turn looked idle). The
// original turn's own result must not consume that message's slot.
let mut ledger = InjectedTurnLedger::new();
ledger.delivered(1);
assert_eq!(ledger.turn_ended(), 1); // original turn ends normally
assert_eq!(ledger.turn_ended(), 0); // injected turn ends
}

#[test]
fn injected_turn_ledger_no_injections() {
let mut ledger = InjectedTurnLedger::new();
assert_eq!(ledger.pending(), 0);
assert_eq!(ledger.turn_ended(), 0); // original turn only
}

#[test]
fn tool_in_flight_tracking_follows_tool_use_lifecycle() {
let mut state = ClaudeStreamState::new();
Expand Down
Loading