Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/browser/components/ChatPane.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ export const ChatPane: React.FC<ChatPaneProps> = (props) => {
}, [setEditingMessage]);

const handleMessageSent = useCallback(() => {
storeRaw.noteOutboundMessage(workspaceId);

// Auto-background any running foreground bash when user sends a new message
// This prevents the user from waiting for the bash to complete before their message is processed
autoBackgroundOnSend();
Expand All @@ -384,7 +386,7 @@ export const ChatPane: React.FC<ChatPaneProps> = (props) => {
// Reset autoRetry when user sends a message
// User action = clear intent: "I'm actively using this workspace"
setAutoRetry(true);
}, [setAutoScroll, setAutoRetry, autoBackgroundOnSend]);
}, [storeRaw, workspaceId, setAutoScroll, setAutoRetry, autoBackgroundOnSend]);

const handleClearHistory = useCallback(
async (percentage = 1.0) => {
Expand Down
144 changes: 138 additions & 6 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ interface WorkspaceChatTransientState {
replayingHistory: boolean;
queuedMessage: QueuedMessage | null;
liveBashOutput: Map<string, LiveBashOutputInternal>;
lastEventAt: number;
lastOutboundAt: number | null;
}

function createInitialChatTransientState(): WorkspaceChatTransientState {
Expand All @@ -214,9 +216,16 @@ function createInitialChatTransientState(): WorkspaceChatTransientState {
replayingHistory: false,
queuedMessage: null,
liveBashOutput: new Map(),
lastEventAt: Date.now(),
lastOutboundAt: null,
};
}

const ON_CHAT_PENDING_START_TIMEOUT_MS = 15000;
const ON_CHAT_QUEUED_TIMEOUT_MS = 300000;
const ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS = 60000;
const ON_CHAT_STALL_CHECK_MS = 5000;

const ON_CHAT_RETRY_BASE_MS = 250;
const ON_CHAT_RETRY_MAX_MS = 5000;

Expand Down Expand Up @@ -1265,12 +1274,91 @@ export class WorkspaceStore {
aggregator.clear();

// Reset per-workspace transient state so the next replay rebuilds from the backend source of truth.
this.chatTransientState.set(workspaceId, createInitialChatTransientState());
// Preserve outbound send expectations so stalled subscriptions keep retrying until events arrive.
const previousOutboundAt = this.chatTransientState.get(workspaceId)?.lastOutboundAt ?? null;
const nextTransient = createInitialChatTransientState();
nextTransient.lastOutboundAt = previousOutboundAt;
this.chatTransientState.set(workspaceId, nextTransient);

this.states.bump(workspaceId);
this.checkAndBumpRecencyIfChanged();
}

private getOnChatWatchdogExpectation(
workspaceId: string
): { timeoutMs: number; transient: WorkspaceChatTransientState } | null {
const aggregator = this.aggregators.get(workspaceId);
const transient = this.chatTransientState.get(workspaceId);
if (!aggregator || !transient) {
return null;
}

const hasActiveStream = aggregator.getActiveStreams().length > 0;
const hasQueuedMessage = transient.queuedMessage !== null;
const pendingStreamStartTime = aggregator.getPendingStreamStartTime();
const waitingForStreamStart = pendingStreamStartTime !== null && !hasQueuedMessage;
const hasOutbound = transient.lastOutboundAt !== null;

const shouldExpect =
hasActiveStream || hasQueuedMessage || waitingForStreamStart || hasOutbound;

if (!shouldExpect) {
return null;
}

const timeoutMs = hasActiveStream
? ON_CHAT_ACTIVE_STREAM_TIMEOUT_MS
: hasQueuedMessage
? ON_CHAT_QUEUED_TIMEOUT_MS
: ON_CHAT_PENDING_START_TIMEOUT_MS;

return { timeoutMs, transient };
}

private startOnChatWatchdog(
workspaceId: string,
options: {
signal: AbortSignal;
attemptSignal: AbortSignal;
abortAttempt: () => void;
}
): { didStall: () => boolean; stop: () => void } {
let stalled = false;
let wasExpecting = false;

const watchdog = setInterval(() => {
if (options.signal.aborted || options.attemptSignal.aborted) {
return;
}

const expectation = this.getOnChatWatchdogExpectation(workspaceId);
if (!expectation) {
wasExpecting = false;
return;
}

const now = Date.now();
const { timeoutMs, transient } = expectation;

if (!wasExpecting) {
wasExpecting = true;
transient.lastEventAt = now;
return;
}

if (now - transient.lastEventAt > timeoutMs) {
stalled = true;
console.warn(`[WorkspaceStore] onChat stalled for ${workspaceId}; restarting...`);
options.abortAttempt();
}
}, ON_CHAT_STALL_CHECK_MS);

return {
didStall: () => stalled,
stop: () => clearInterval(watchdog),
};
}

/**
* Subscribe to workspace chat events (history replay + live streaming).
* Retries on unexpected iterator termination to avoid requiring a full app restart.
Expand All @@ -1284,8 +1372,24 @@ export class WorkspaceStore {
return;
}

const attemptController = new AbortController();
const { signal: attemptSignal } = attemptController;
const abortAttempt = () => attemptController.abort();
signal.addEventListener("abort", abortAttempt, { once: true });
if (signal.aborted) {
abortAttempt();
signal.removeEventListener("abort", abortAttempt);
return;
}

const watchdog = this.startOnChatWatchdog(workspaceId, {
signal,
attemptSignal,
abortAttempt,
});

try {
const iterator = await client.workspace.onChat({ workspaceId }, { signal });
const iterator = await client.workspace.onChat({ workspaceId }, { signal: attemptSignal });

for await (const data of iterator) {
if (signal.aborted) {
Expand All @@ -1295,6 +1399,12 @@ export class WorkspaceStore {
// Connection is alive again - don't carry old backoff into the next failure.
attempt = 0;

const transient = this.chatTransientState.get(workspaceId);
if (transient) {
transient.lastEventAt = Date.now();
transient.lastOutboundAt = null;
}

queueMicrotask(() => {
this.handleChatMessage(workspaceId, data);
});
Expand All @@ -1305,9 +1415,11 @@ export class WorkspaceStore {
return;
}

console.warn(
`[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...`
);
if (!watchdog.didStall()) {
console.warn(
`[WorkspaceStore] onChat subscription ended unexpectedly for ${workspaceId}; retrying...`
);
}
} catch (error) {
// Suppress errors when subscription was intentionally cleaned up
if (signal.aborted) {
Expand All @@ -1318,7 +1430,9 @@ export class WorkspaceStore {
// 1. Schema validation fails (event doesn't match WorkspaceChatMessageSchema)
// 2. Workspace was removed on server side (iterator ends with error)
// 3. Connection dropped (WebSocket/MessagePort error)
if (isIteratorValidationFailed(error)) {
if (attemptSignal.aborted && watchdog.didStall()) {
// Watchdog requested restart; skip noisy logging.
} else if (isIteratorValidationFailed(error)) {
// Only suppress if workspace no longer exists (was removed during the race)
if (!this.isWorkspaceSubscribed(workspaceId)) {
return;
Expand All @@ -1330,6 +1444,9 @@ export class WorkspaceStore {
} else {
console.error(`[WorkspaceStore] Error in onChat subscription for ${workspaceId}:`, error);
}
} finally {
watchdog.stop();
signal.removeEventListener("abort", abortAttempt);
}

const delayMs = calculateOnChatBackoffMs(attempt);
Expand Down Expand Up @@ -1577,6 +1694,19 @@ export class WorkspaceStore {
this.fileModifyingToolSubs.bump(workspaceId);
}

/**
* Record that the user sent a message so we can detect stalled onChat streams.
*/
noteOutboundMessage(workspaceId: string): void {
const transient = this.chatTransientState.get(workspaceId);
if (!transient) {
return;
}
const now = Date.now();
transient.lastOutboundAt = now;
transient.lastEventAt = now;
}

// Private methods

/**
Expand Down Expand Up @@ -1624,6 +1754,8 @@ export class WorkspaceStore {
const aggregator = this.assertGet(workspaceId);

const transient = this.assertChatTransientState(workspaceId);
transient.lastEventAt = Date.now();
transient.lastOutboundAt = null;

if (isCaughtUpMessage(data)) {
// Check if there's an active stream in buffered events (reconnection scenario)
Expand Down
Loading