From e024f22aa888f28dc5b0a72853ca057776d4f942 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Sat, 14 Mar 2026 18:43:19 -0700 Subject: [PATCH 1/2] Fix redis queuing and run --- apps/sim/app/api/copilot/chat/route.ts | 28 +++ .../app/workspace/[workspaceId]/home/home.tsx | 3 +- .../[workspaceId]/home/hooks/use-chat.ts | 167 ++++++++++++------ .../utils/workflow-execution-utils.ts | 113 ++++++------ apps/sim/hooks/queries/tasks.ts | 7 + .../orchestrator/sse/handlers/handlers.ts | 28 --- 6 files changed, 214 insertions(+), 132 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index fe93d10b5f4..e4baedb1e02 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -454,6 +454,33 @@ export async function GET(req: NextRequest) { return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) } + let streamSnapshot: { + events: Array<{ eventId: number; streamId: string; event: Record }> + status: string + } | null = null + + if (chat.conversationId) { + try { + const { getStreamMeta, readStreamEvents } = await import( + '@/lib/copilot/orchestrator/stream/buffer' + ) + const [meta, events] = await Promise.all([ + getStreamMeta(chat.conversationId), + readStreamEvents(chat.conversationId, 0), + ]) + streamSnapshot = { + events: events || [], + status: meta?.status || 'unknown', + } + } catch (err) { + logger.warn('Failed to read stream snapshot for chat', { + chatId, + conversationId: chat.conversationId, + error: err instanceof Error ? err.message : String(err), + }) + } + } + const transformedChat = { id: chat.id, title: chat.title, @@ -466,6 +493,7 @@ export async function GET(req: NextRequest) { resources: Array.isArray(chat.resources) ? chat.resources : [], createdAt: chat.createdAt, updatedAt: chat.updatedAt, + ...(streamSnapshot ? { streamSnapshot } : {}), } logger.info(`Retrieved chat ${chatId}`) diff --git a/apps/sim/app/workspace/[workspaceId]/home/home.tsx b/apps/sim/app/workspace/[workspaceId]/home/home.tsx index 64ef65819f6..5bfda4b7f78 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/home.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/home.tsx @@ -178,6 +178,7 @@ export function Home({ chatId }: HomeProps = {}) { const { messages, isSending, + isReconnecting, sendMessage, stopGeneration, resolvedChatId, @@ -330,7 +331,7 @@ export function Home({ chatId }: HomeProps = {}) { return () => ro.disconnect() }, [hasMessages]) - if (!hasMessages && chatId && isLoadingHistory) { + if (chatId && (isLoadingHistory || isReconnecting)) { return ( ([]) const [isSending, setIsSending] = useState(false) + const [isReconnecting, setIsReconnecting] = useState(false) const [error, setError] = useState(null) const [resolvedChatId, setResolvedChatId] = useState(initialChatId) const [resources, setResources] = useState([]) @@ -268,6 +270,10 @@ export function useChat( }, [messageQueue]) const sendMessageRef = useRef(async () => {}) + const processSSEStreamRef = useRef< + (reader: ReadableStreamDefaultReader, assistantId: string) => Promise + >(async () => {}) + const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) const abortControllerRef = useRef(null) const chatIdRef = useRef(initialChatId) @@ -329,6 +335,7 @@ export function useChat( setMessages([]) setError(null) setIsSending(false) + setIsReconnecting(false) setResources([]) setActiveResourceId(null) setMessageQueue([]) @@ -346,6 +353,7 @@ export function useChat( setMessages([]) setError(null) setIsSending(false) + setIsReconnecting(false) setResources([]) setActiveResourceId(null) setMessageQueue([]) @@ -365,6 +373,95 @@ export function useChat( ensureWorkflowInRegistry(resource.id, resource.title, workspaceId) } } + + // Kick off stream reconnection immediately if there's an active stream. + // The stream snapshot was fetched in parallel with the chat history (same + // API call), so there's no extra round-trip. + const activeStreamId = chatHistory.activeStreamId + const snapshot = chatHistory.streamSnapshot + if (activeStreamId && !sendingRef.current) { + const gen = ++streamGenRef.current + const abortController = new AbortController() + abortControllerRef.current = abortController + streamIdRef.current = activeStreamId + sendingRef.current = true + setIsReconnecting(true) + + const assistantId = crypto.randomUUID() + + const reconnect = async () => { + try { + const encoder = new TextEncoder() + + const batchEvents = snapshot?.events ?? [] + const streamStatus = snapshot?.status ?? '' + + if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) { + // No snapshot available — stream buffer expired. Clean up. + const cid = chatIdRef.current + if (cid) { + fetch('/api/mothership/chat/stop', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }), + }).catch(() => {}) + } + return + } + + setIsSending(true) + setIsReconnecting(false) + + const lastEventId = + batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0 + const isStreamDone = streamStatus === 'complete' || streamStatus === 'error' + + const combinedStream = new ReadableStream({ + async start(controller) { + if (batchEvents.length > 0) { + const sseText = batchEvents + .map((e) => `data: ${JSON.stringify(e.event)}\n`) + .join('\n') + controller.enqueue(encoder.encode(`${sseText}\n`)) + } + + if (!isStreamDone) { + try { + const sseRes = await fetch( + `/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`, + { signal: abortController.signal } + ) + if (sseRes.ok && sseRes.body) { + const reader = sseRes.body.getReader() + while (true) { + const { done, value } = await reader.read() + if (done) break + controller.enqueue(value) + } + } + } catch (err) { + if (!(err instanceof Error && err.name === 'AbortError')) { + logger.warn('SSE tail failed during reconnect', err) + } + } + } + + controller.close() + }, + }) + + await processSSEStreamRef.current(combinedStream.getReader(), assistantId) + } catch (err) { + if (err instanceof Error && err.name === 'AbortError') return + } finally { + setIsReconnecting(false) + if (streamGenRef.current === gen) { + finalizeRef.current() + } + } + } + reconnect() + } }, [chatHistory, workspaceId]) useEffect(() => { @@ -405,11 +502,14 @@ export function useChat( const flush = () => { streamingBlocksRef.current = [...blocks] - setMessages((prev) => - prev.map((m) => - m.id === assistantId ? { ...m, content: runningText, contentBlocks: [...blocks] } : m - ) - ) + const snapshot = { content: runningText, contentBlocks: [...blocks] } + setMessages((prev) => { + const idx = prev.findIndex((m) => m.id === assistantId) + if (idx >= 0) { + return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m)) + } + return [...prev, { id: assistantId, role: 'assistant' as const, ...snapshot }] + }) } while (true) { @@ -662,6 +762,9 @@ export function useChat( }, [workspaceId, queryClient, addResource, removeResource] ) + useLayoutEffect(() => { + processSSEStreamRef.current = processSSEStream + }) const persistPartialResponse = useCallback(async () => { const chatId = chatIdRef.current @@ -750,50 +853,9 @@ export function useChat( }, [invalidateChatQueries] ) - - useEffect(() => { - const activeStreamId = chatHistory?.activeStreamId - if (!activeStreamId || !appliedChatIdRef.current || sendingRef.current) return - - const gen = ++streamGenRef.current - const abortController = new AbortController() - abortControllerRef.current = abortController - sendingRef.current = true - setIsSending(true) - - const assistantId = crypto.randomUUID() - setMessages((prev) => [ - ...prev, - { - id: assistantId, - role: 'assistant' as const, - content: '', - contentBlocks: [], - }, - ]) - - const reconnect = async () => { - try { - const response = await fetch(`/api/copilot/chat/stream?streamId=${activeStreamId}&from=0`, { - signal: abortController.signal, - }) - if (!response.ok || !response.body) return - await processSSEStream(response.body.getReader(), assistantId) - } catch (err) { - if (err instanceof Error && err.name === 'AbortError') return - } finally { - if (streamGenRef.current === gen) { - finalize() - } - } - } - reconnect() - - return () => { - abortController.abort() - appliedChatIdRef.current = undefined - } - }, [chatHistory?.activeStreamId, processSSEStream, finalize]) + useLayoutEffect(() => { + finalizeRef.current = finalize + }) const sendMessage = useCallback( async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => { @@ -937,7 +999,11 @@ export function useChat( if (sendingRef.current) { await persistPartialResponse() } - const sid = streamIdRef.current + const sid = + streamIdRef.current || + queryClient.getQueryData(taskKeys.detail(chatIdRef.current)) + ?.activeStreamId || + undefined streamGenRef.current++ abortControllerRef.current?.abort() abortControllerRef.current = null @@ -1054,6 +1120,7 @@ export function useChat( return { messages, isSending, + isReconnecting, error, resolvedChatId, sendMessage, diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index 7f240290201..c0ca16cc9af 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -521,62 +521,69 @@ export async function executeWorkflowWithFullLogging( const data = line.substring(6).trim() if (data === '[DONE]') continue + let event: any try { - const event = JSON.parse(data) - - switch (event.type) { - case 'execution:started': { - setCurrentExecutionId(wfId, event.executionId) - executionIdRef.current = event.executionId || executionId - break - } + event = JSON.parse(data) + } catch { + continue + } - case 'block:started': - blockHandlers.onBlockStarted(event.data) - break - - case 'block:completed': - blockHandlers.onBlockCompleted(event.data) - break - - case 'block:error': - blockHandlers.onBlockError(event.data) - break - - case 'block:childWorkflowStarted': - blockHandlers.onBlockChildWorkflowStarted(event.data) - break - - case 'execution:completed': - setCurrentExecutionId(wfId, null) - executionResult = { - success: event.data.success, - output: event.data.output, - logs: [], - metadata: { - duration: event.data.duration, - startTime: event.data.startTime, - endTime: event.data.endTime, - }, - } - break - - case 'execution:cancelled': - setCurrentExecutionId(wfId, null) - executionResult = { - success: false, - output: {}, - error: 'Execution was cancelled', - logs: [], - } - break - - case 'execution:error': - setCurrentExecutionId(wfId, null) - throw new Error(event.data.error || 'Execution failed') + switch (event.type) { + case 'execution:started': { + setCurrentExecutionId(wfId, event.executionId) + executionIdRef.current = event.executionId || executionId + break } - } catch (parseError) { - // Skip malformed SSE events + + case 'block:started': + blockHandlers.onBlockStarted(event.data) + break + + case 'block:completed': + blockHandlers.onBlockCompleted(event.data) + break + + case 'block:error': + blockHandlers.onBlockError(event.data) + break + + case 'block:childWorkflowStarted': + blockHandlers.onBlockChildWorkflowStarted(event.data) + break + + case 'execution:completed': + setCurrentExecutionId(wfId, null) + executionResult = { + success: event.data.success, + output: event.data.output, + logs: [], + metadata: { + duration: event.data.duration, + startTime: event.data.startTime, + endTime: event.data.endTime, + }, + } + break + + case 'execution:cancelled': + setCurrentExecutionId(wfId, null) + executionResult = { + success: false, + output: {}, + error: 'Execution was cancelled', + logs: [], + } + break + + case 'execution:error': + setCurrentExecutionId(wfId, null) + executionResult = { + success: false, + output: {}, + error: event.data.error || 'Execution failed', + logs: [], + } + break } } } diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 2f006db1bd8..198d59701c2 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -9,12 +9,18 @@ export interface TaskMetadata { isUnread: boolean } +export interface StreamSnapshot { + events: Array<{ eventId: number; streamId: string; event: Record }> + status: string +} + export interface TaskChatHistory { id: string title: string | null messages: TaskStoredMessage[] activeStreamId: string | null resources: MothershipResource[] + streamSnapshot?: StreamSnapshot | null } export interface TaskStoredToolCall { @@ -135,6 +141,7 @@ async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise = { } if (options.interactive === false) { - if (clientExecutable && CLIENT_WORKFLOW_TOOLS.has(toolName)) { - toolCall.status = 'executing' - const completion = await waitForToolCompletion( - toolCallId, - options.timeout || STREAM_TIMEOUT_MS, - options.abortSignal - ) - handleClientCompletion(toolCall, toolCallId, completion) - await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options) - return - } if (options.autoExecuteTools !== false) { fireToolExecution() } @@ -580,17 +563,6 @@ export const subAgentHandlers: Record = { } if (options.interactive === false) { - if (clientExecutable && CLIENT_WORKFLOW_TOOLS.has(toolName)) { - toolCall.status = 'executing' - const completion = await waitForToolCompletion( - toolCallId, - options.timeout || STREAM_TIMEOUT_MS, - options.abortSignal - ) - handleClientCompletion(toolCall, toolCallId, completion) - await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options) - return - } if (options.autoExecuteTools !== false) { fireToolExecution() } From 34283b551f903a32392be1c2df1d00764f6a58e8 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Sat, 14 Mar 2026 18:54:37 -0700 Subject: [PATCH 2/2] Fix dynimp --- apps/sim/app/api/copilot/chat/route.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index e4baedb1e02..57e6575d6ce 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -14,6 +14,7 @@ import { } from '@/lib/copilot/chat-streaming' import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models' import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator' +import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer' import { authenticateCopilotRequestSessionOnly, createBadRequestResponse, @@ -461,9 +462,6 @@ export async function GET(req: NextRequest) { if (chat.conversationId) { try { - const { getStreamMeta, readStreamEvents } = await import( - '@/lib/copilot/orchestrator/stream/buffer' - ) const [meta, events] = await Promise.all([ getStreamMeta(chat.conversationId), readStreamEvents(chat.conversationId, 0),