diff --git a/src/browser/components/ChatPane.tsx b/src/browser/components/ChatPane.tsx index 086ac47693..7e772a0659 100644 --- a/src/browser/components/ChatPane.tsx +++ b/src/browser/components/ChatPane.tsx @@ -374,6 +374,8 @@ export const ChatPane: React.FC = (props) => { }, [setEditingMessage]); const handleMessageSent = useCallback(() => { + storeRaw.noteOutboundMessage(workspaceId); + // Auto-background any running foreground bash when user sends a new message // This prevents the user from waiting for the bash to complete before their message is processed autoBackgroundOnSend(); @@ -384,7 +386,7 @@ export const ChatPane: React.FC = (props) => { // Reset autoRetry when user sends a message // User action = clear intent: "I'm actively using this workspace" setAutoRetry(true); - }, [setAutoScroll, setAutoRetry, autoBackgroundOnSend]); + }, [storeRaw, workspaceId, setAutoScroll, setAutoRetry, autoBackgroundOnSend]); const handleClearHistory = useCallback( async (percentage = 1.0) => { diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 77987cd7ef..43c3b2bcd8 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -204,6 +204,8 @@ interface WorkspaceChatTransientState { replayingHistory: boolean; queuedMessage: QueuedMessage | null; liveBashOutput: Map; + lastEventAt: number; + lastOutboundAt: number | null; } function createInitialChatTransientState(): WorkspaceChatTransientState { @@ -214,9 +216,16 @@ function createInitialChatTransientState(): WorkspaceChatTransientState { replayingHistory: false, queuedMessage: null, liveBashOutput: new Map(), + lastEventAt: Date.now(), + lastOutboundAt: null, }; } +const ON_CHAT_PENDING_START_TIMEOUT_MS = 15000; +const ON_CHAT_QUEUED_TIMEOUT_MS = 300000; +const ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS = 60000; +const ON_CHAT_STALL_CHECK_MS = 5000; + const ON_CHAT_RETRY_BASE_MS = 250; const ON_CHAT_RETRY_MAX_MS = 5000; @@ -1265,12 +1274,91 @@ export class WorkspaceStore { aggregator.clear(); // Reset per-workspace transient state so the next replay rebuilds from the backend source of truth. - this.chatTransientState.set(workspaceId, createInitialChatTransientState()); + // Preserve outbound send expectations so stalled subscriptions keep retrying until events arrive. + const previousOutboundAt = this.chatTransientState.get(workspaceId)?.lastOutboundAt ?? null; + const nextTransient = createInitialChatTransientState(); + nextTransient.lastOutboundAt = previousOutboundAt; + this.chatTransientState.set(workspaceId, nextTransient); this.states.bump(workspaceId); this.checkAndBumpRecencyIfChanged(); } + private getOnChatWatchdogExpectation( + workspaceId: string + ): { timeoutMs: number; transient: WorkspaceChatTransientState } | null { + const aggregator = this.aggregators.get(workspaceId); + const transient = this.chatTransientState.get(workspaceId); + if (!aggregator || !transient) { + return null; + } + + const hasActiveStream = aggregator.getActiveStreams().length > 0; + const hasQueuedMessage = transient.queuedMessage !== null; + const pendingStreamStartTime = aggregator.getPendingStreamStartTime(); + const waitingForStreamStart = pendingStreamStartTime !== null && !hasQueuedMessage; + const hasOutbound = transient.lastOutboundAt !== null; + + const shouldExpect = + hasActiveStream || hasQueuedMessage || waitingForStreamStart || hasOutbound; + + if (!shouldExpect) { + return null; + } + + const timeoutMs = hasActiveStream + ? ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS + : hasQueuedMessage + ? ON_CHAT_QUEUED_TIMEOUT_MS + : ON_CHAT_PENDING_START_TIMEOUT_MS; + + return { timeoutMs, transient }; + } + + private startOnChatWatchdog( + workspaceId: string, + options: { + signal: AbortSignal; + attemptSignal: AbortSignal; + abortAttempt: () => void; + } + ): { didStall: () => boolean; stop: () => void } { + let stalled = false; + let wasExpecting = false; + + const watchdog = setInterval(() => { + if (options.signal.aborted || options.attemptSignal.aborted) { + return; + } + + const expectation = this.getOnChatWatchdogExpectation(workspaceId); + if (!expectation) { + wasExpecting = false; + return; + } + + const now = Date.now(); + const { timeoutMs, transient } = expectation; + + if (!wasExpecting) { + wasExpecting = true; + transient.lastEventAt = now; + return; + } + + if (now - transient.lastEventAt > timeoutMs) { + stalled = true; + console.warn(`[WorkspaceStore] onChat stalled for ${workspaceId}; restarting...`); + options.abortAttempt(); + } + }, ON_CHAT_STALL_CHECK_MS); + + return { + didStall: () => stalled, + stop: () => clearInterval(watchdog), + }; + } + /** * Subscribe to workspace chat events (history replay + live streaming). * Retries on unexpected iterator termination to avoid requiring a full app restart. @@ -1284,8 +1372,24 @@ export class WorkspaceStore { return; } + const attemptController = new AbortController(); + const { signal: attemptSignal } = attemptController; + const abortAttempt = () => attemptController.abort(); + signal.addEventListener("abort", abortAttempt, { once: true }); + if (signal.aborted) { + abortAttempt(); + signal.removeEventListener("abort", abortAttempt); + return; + } + + const watchdog = this.startOnChatWatchdog(workspaceId, { + signal, + attemptSignal, + abortAttempt, + }); + try { - const iterator = await client.workspace.onChat({ workspaceId }, { signal }); + const iterator = await client.workspace.onChat({ workspaceId }, { signal: attemptSignal }); for await (const data of iterator) { if (signal.aborted) { @@ -1295,6 +1399,12 @@ export class WorkspaceStore { // Connection is alive again - don't carry old backoff into the next failure. attempt = 0; + const transient = this.chatTransientState.get(workspaceId); + if (transient) { + transient.lastEventAt = Date.now(); + transient.lastOutboundAt = null; + } + queueMicrotask(() => { this.handleChatMessage(workspaceId, data); }); @@ -1305,9 +1415,11 @@ export class WorkspaceStore { return; } - console.warn( - `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` - ); + if (!watchdog.didStall()) { + console.warn( + `[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...` + ); + } } catch (error) { // Suppress errors when subscription was intentionally cleaned up if (signal.aborted) { @@ -1318,7 +1430,9 @@ export class WorkspaceStore { // 1. Schema validation fails (event doesn't match WorkspaceChatMessageSchema) // 2. Workspace was removed on server side (iterator ends with error) // 3. Connection dropped (WebSocket/MessagePort error) - if (isIteratorValidationFailed(error)) { + if (attemptSignal.aborted && watchdog.didStall()) { + // Watchdog requested restart; skip noisy logging. + } else if (isIteratorValidationFailed(error)) { // Only suppress if workspace no longer exists (was removed during the race) if (!this.isWorkspaceSubscribed(workspaceId)) { return; @@ -1330,6 +1444,9 @@ export class WorkspaceStore { } else { console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error); } + } finally { + watchdog.stop(); + signal.removeEventListener("abort", abortAttempt); } const delayMs = calculateOnChatBackoffMs(attempt); @@ -1577,6 +1694,19 @@ export class WorkspaceStore { this.fileModifyingToolSubs.bump(workspaceId); } + /** + * Record that the user sent a message so we can detect stalled onChat streams. + */ + noteOutboundMessage(workspaceId: string): void { + const transient = this.chatTransientState.get(workspaceId); + if (!transient) { + return; + } + const now = Date.now(); + transient.lastOutboundAt = now; + transient.lastEventAt = now; + } + // Private methods /** @@ -1624,6 +1754,8 @@ export class WorkspaceStore { const aggregator = this.assertGet(workspaceId); const transient = this.assertChatTransientState(workspaceId); + transient.lastEventAt = Date.now(); + transient.lastOutboundAt = null; if (isCaughtUpMessage(data)) { // Check if there's an active stream in buffered events (reconnection scenario)