From 2378e9e9ed3db0411c8d1f2e85cc88a5a86ba8d6 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sat, 17 Jan 2026 18:35:28 -0600 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=A4=96=20fix:=20restart=20stalled=20c?= =?UTF-8?q?hat=20subscriptions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/browser/components/ChatPane.tsx | 4 +- src/browser/stores/WorkspaceStore.ts | 103 +++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 7 deletions(-) diff --git a/src/browser/components/ChatPane.tsx b/src/browser/components/ChatPane.tsx index 086ac47693..7e772a0659 100644 --- a/src/browser/components/ChatPane.tsx +++ b/src/browser/components/ChatPane.tsx @@ -374,6 +374,8 @@ export const ChatPane: React.FC = (props) => { }, [setEditingMessage]); const handleMessageSent = useCallback(() => { + storeRaw.noteOutboundMessage(workspaceId); + // Auto-background any running foreground bash when user sends a new message // This prevents the user from waiting for the bash to complete before their message is processed autoBackgroundOnSend(); @@ -384,7 +386,7 @@ export const ChatPane: React.FC = (props) => { // Reset autoRetry when user sends a message // User action = clear intent: "I'm actively using this workspace" setAutoRetry(true); - }, [setAutoScroll, setAutoRetry, autoBackgroundOnSend]); + }, [storeRaw, workspaceId, setAutoScroll, setAutoRetry, autoBackgroundOnSend]); const handleClearHistory = useCallback( async (percentage = 1.0) => { diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 77987cd7ef..13f92eb35e 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -204,6 +204,8 @@ interface WorkspaceChatTransientState { replayingHistory: boolean; queuedMessage: QueuedMessage | null; liveBashOutput: Map; + lastEventAt: number; + lastOutboundAt: number | null; } function createInitialChatTransientState(): WorkspaceChatTransientState { @@ -214,9 +216,16 @@ function createInitialChatTransientState(): WorkspaceChatTransientState { replayingHistory: false, queuedMessage: null, liveBashOutput: new Map(), + lastEventAt: Date.now(), + lastOutboundAt: null, }; } +const ON_CHAT_PENDING_START_TIMEOUT_MS = 15000; +const ON_CHAT_QUEUED_TIMEOUT_MS = 300000; +const ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS = 60000; +const ON_CHAT_STALL_CHECK_MS = 5000; + const ON_CHAT_RETRY_BASE_MS = 250; const ON_CHAT_RETRY_MAX_MS = 5000; @@ -1265,7 +1274,11 @@ export class WorkspaceStore { aggregator.clear(); // Reset per-workspace transient state so the next replay rebuilds from the backend source of truth. - this.chatTransientState.set(workspaceId, createInitialChatTransientState()); + // Preserve outbound send expectations so stalled subscriptions keep retrying until events arrive. + const previousOutboundAt = this.chatTransientState.get(workspaceId)?.lastOutboundAt ?? null; + const nextTransient = createInitialChatTransientState(); + nextTransient.lastOutboundAt = previousOutboundAt; + this.chatTransientState.set(workspaceId, nextTransient); this.states.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); @@ -1284,8 +1297,61 @@ export class WorkspaceStore { return; } + const attemptController = new AbortController(); + const { signal: attemptSignal } = attemptController; + const abortAttempt = () => attemptController.abort(); + signal.addEventListener("abort", abortAttempt, { once: true }); + + let stalled = false; + let wasExpecting = false; + + const watchdog = setInterval(() => { + if (signal.aborted || attemptSignal.aborted) { + return; + } + + const aggregator = this.aggregators.get(workspaceId); + const transient = this.chatTransientState.get(workspaceId); + if (!aggregator || !transient) { + return; + } + + const now = Date.now(); + const hasActiveStream = aggregator.getActiveStreams().length > 0; + const hasQueuedMessage = transient.queuedMessage !== null; + const pendingStreamStartTime = aggregator.getPendingStreamStartTime(); + const waitingForStreamStart = pendingStreamStartTime !== null && !hasQueuedMessage; + const hasOutbound = transient.lastOutboundAt !== null; + + const shouldExpect = + hasActiveStream || hasQueuedMessage || waitingForStreamStart || hasOutbound; + + if (!shouldExpect) { + wasExpecting = false; + return; + } + + if (!wasExpecting) { + wasExpecting = true; + transient.lastEventAt = now; + return; + } + + const timeoutMs = hasActiveStream + ? ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS + : hasQueuedMessage + ? ON_CHAT_QUEUED_TIMEOUT_MS + : ON_CHAT_PENDING_START_TIMEOUT_MS; + + if (now - transient.lastEventAt > timeoutMs) { + stalled = true; + console.warn(`[WorkspaceStore] onChat stalled for ${workspaceId}; restarting...`); + attemptController.abort(); + } + }, ON_CHAT_STALL_CHECK_MS); + try { - const iterator = await client.workspace.onChat({ workspaceId }, { signal }); + const iterator = await client.workspace.onChat({ workspaceId }, { signal: attemptSignal }); for await (const data of iterator) { if (signal.aborted) { @@ -1295,6 +1361,12 @@ export class WorkspaceStore { // Connection is alive again - don't carry old backoff into the next failure. attempt = 0; + const transient = this.chatTransientState.get(workspaceId); + if (transient) { + transient.lastEventAt = Date.now(); + transient.lastOutboundAt = null; + } + queueMicrotask(() => { this.handleChatMessage(workspaceId, data); }); @@ -1305,9 +1377,11 @@ export class WorkspaceStore { return; } - console.warn( - `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` - ); + if (!stalled) { + console.warn( + `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` + ); + } } catch (error) { // Suppress errors when subscription was intentionally cleaned up if (signal.aborted) { @@ -1318,7 +1392,9 @@ export class WorkspaceStore { // 1. Schema validation fails (event doesn't match WorkspaceChatMessageSchema) // 2. Workspace was removed on server side (iterator ends with error) // 3. Connection dropped (WebSocket/MessagePort error) - if (isIteratorValidationFailed(error)) { + if (attemptSignal.aborted && stalled) { + // Watchdog requested restart; skip noisy logging. + } else if (isIteratorValidationFailed(error)) { // Only suppress if workspace no longer exists (was removed during the race) if (!this.isWorkspaceSubscribed(workspaceId)) { return; @@ -1330,6 +1406,9 @@ export class WorkspaceStore { } else { console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); } + } finally { + clearInterval(watchdog); + signal.removeEventListener("abort", abortAttempt); } const delayMs = calculateOnChatBackoffMs(attempt); @@ -1577,6 +1656,16 @@ export class WorkspaceStore { this.fileModifyingToolSubs.bump(workspaceId); } + /** + * Record that the user sent a message so we can detect stalled onChat streams. + */ + noteOutboundMessage(workspaceId: string): void { + const transient = this.assertChatTransientState(workspaceId); + const now = Date.now(); + transient.lastOutboundAt = now; + transient.lastEventAt = now; + } + // Private methods /** @@ -1624,6 +1713,8 @@ export class WorkspaceStore { const aggregator = this.assertGet(workspaceId); const transient = this.assertChatTransientState(workspaceId); + transient.lastEventAt = Date.now(); + transient.lastOutboundAt = null; if (isCaughtUpMessage(data)) { // Check if there's an active stream in buffered events (reconnection scenario) From 234afd0fe78a4f623ffcea677b63e9da308de1a4 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 18 Jan 2026 10:35:59 -0600 Subject: [PATCH 2/4] fix: centralize onChat watchdog logic --- src/browser/stores/WorkspaceStore.ts | 133 +++++++++++++++++---------- 1 file changed, 83 insertions(+), 50 deletions(-) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 13f92eb35e..4c40e5abb4 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1284,6 +1284,81 @@ export class WorkspaceStore { this.checkAndBumpRecencyIfChanged(); } + private getOnChatWatchdogExpectation( + workspaceId: string + ): { timeoutMs: number; transient: WorkspaceChatTransientState } | null { + const aggregator = this.aggregators.get(workspaceId); + const transient = this.chatTransientState.get(workspaceId); + if (!aggregator || !transient) { + return null; + } + + const hasActiveStream = aggregator.getActiveStreams().length > 0; + const hasQueuedMessage = transient.queuedMessage !== null; + const pendingStreamStartTime = aggregator.getPendingStreamStartTime(); + const waitingForStreamStart = pendingStreamStartTime !== null && !hasQueuedMessage; + const hasOutbound = transient.lastOutboundAt !== null; + + const shouldExpect = + hasActiveStream || hasQueuedMessage || waitingForStreamStart || hasOutbound; + + if (!shouldExpect) { + return null; + } + + const timeoutMs = hasActiveStream + ? ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS + : hasQueuedMessage + ? ON_CHAT_QUEUED_TIMEOUT_MS + : ON_CHAT_PENDING_START_TIMEOUT_MS; + + return { timeoutMs, transient }; + } + + private startOnChatWatchdog( + workspaceId: string, + options: { + signal: AbortSignal; + attemptSignal: AbortSignal; + abortAttempt: () => void; + } + ): { didStall: () => boolean; stop: () => void } { + let stalled = false; + let wasExpecting = false; + + const watchdog = setInterval(() => { + if (options.signal.aborted || options.attemptSignal.aborted) { + return; + } + + const expectation = this.getOnChatWatchdogExpectation(workspaceId); + if (!expectation) { + wasExpecting = false; + return; + } + + const now = Date.now(); + const { timeoutMs, transient } = expectation; + + if (!wasExpecting) { + wasExpecting = true; + transient.lastEventAt = now; + return; + } + + if (now - transient.lastEventAt > timeoutMs) { + stalled = true; + console.warn(`[WorkspaceStore] onChat stalled for ${workspaceId}; restarting...`); + options.abortAttempt(); + } + }, ON_CHAT_STALL_CHECK_MS); + + return { + didStall: () => stalled, + stop: () => clearInterval(watchdog), + }; + } + /** * Subscribe to workspace chat events (history replay + live streaming). * Retries on unexpected iterator termination to avoid requiring a full app restart. @@ -1302,53 +1377,11 @@ export class WorkspaceStore { const abortAttempt = () => attemptController.abort(); signal.addEventListener("abort", abortAttempt, { once: true }); - let stalled = false; - let wasExpecting = false; - - const watchdog = setInterval(() => { - if (signal.aborted || attemptSignal.aborted) { - return; - } - - const aggregator = this.aggregators.get(workspaceId); - const transient = this.chatTransientState.get(workspaceId); - if (!aggregator || !transient) { - return; - } - - const now = Date.now(); - const hasActiveStream = aggregator.getActiveStreams().length > 0; - const hasQueuedMessage = transient.queuedMessage !== null; - const pendingStreamStartTime = aggregator.getPendingStreamStartTime(); - const waitingForStreamStart = pendingStreamStartTime !== null && !hasQueuedMessage; - const hasOutbound = transient.lastOutboundAt !== null; - - const shouldExpect = - hasActiveStream || hasQueuedMessage || waitingForStreamStart || hasOutbound; - - if (!shouldExpect) { - wasExpecting = false; - return; - } - - if (!wasExpecting) { - wasExpecting = true; - transient.lastEventAt = now; - return; - } - - const timeoutMs = hasActiveStream - ? ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS - : hasQueuedMessage - ? ON_CHAT_QUEUED_TIMEOUT_MS - : ON_CHAT_PENDING_START_TIMEOUT_MS; - - if (now - transient.lastEventAt > timeoutMs) { - stalled = true; - console.warn(`[WorkspaceStore] onChat stalled for ${workspaceId}; restarting...`); - attemptController.abort(); - } - }, ON_CHAT_STALL_CHECK_MS); + const watchdog = this.startOnChatWatchdog(workspaceId, { + signal, + attemptSignal, + abortAttempt, + }); try { const iterator = await client.workspace.onChat({ workspaceId }, { signal: attemptSignal }); @@ -1377,7 +1410,7 @@ export class WorkspaceStore { return; } - if (!stalled) { + if (!watchdog.didStall()) { console.warn( `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` ); @@ -1392,7 +1425,7 @@ export class WorkspaceStore { // 1. Schema validation fails (event doesn't match WorkspaceChatMessageSchema) // 2. Workspace was removed on server side (iterator ends with error) // 3. Connection dropped (WebSocket/MessagePort error) - if (attemptSignal.aborted && stalled) { + if (attemptSignal.aborted && watchdog.didStall()) { // Watchdog requested restart; skip noisy logging. } else if (isIteratorValidationFailed(error)) { // Only suppress if workspace no longer exists (was removed during the race) @@ -1407,7 +1440,7 @@ export class WorkspaceStore { console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); } } finally { - clearInterval(watchdog); + watchdog.stop(); signal.removeEventListener("abort", abortAttempt); } From 3982a7e93c0f919581f1eb79bcb42405dec0c673 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 18 Jan 2026 10:37:05 -0600 Subject: [PATCH 3/4] fix: guard outbound note for missing workspaces --- src/browser/stores/WorkspaceStore.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 4c40e5abb4..3f450b0b98 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1693,7 +1693,10 @@ export class WorkspaceStore { * Record that the user sent a message so we can detect stalled onChat streams. */ noteOutboundMessage(workspaceId: string): void { - const transient = this.assertChatTransientState(workspaceId); + const transient = this.chatTransientState.get(workspaceId); + if (!transient) { + return; + } const now = Date.now(); transient.lastOutboundAt = now; transient.lastEventAt = now; From d4ac2ff99bab9112e78c8bda20e9f545eb3e7e25 Mon Sep 17 00:00:00 2001 From: Ammar Date: Sun, 18 Jan 2026 10:46:46 -0600 Subject: [PATCH 4/4] fix: abort onChat attempt when parent canceled --- src/browser/stores/WorkspaceStore.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 3f450b0b98..43c3b2bcd8 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -1376,6 +1376,11 @@ export class WorkspaceStore { const { signal: attemptSignal } = attemptController; const abortAttempt = () => attemptController.abort(); signal.addEventListener("abort", abortAttempt, { once: true }); + if (signal.aborted) { + abortAttempt(); + signal.removeEventListener("abort", abortAttempt); + return; + } const watchdog = this.startOnChatWatchdog(workspaceId, { signal,