Skip to content

Commit e024f22

Browse files
committed
Fix redis queuing and run
1 parent 8837f14 commit e024f22

File tree

6 files changed

+214
-132
lines changed

6 files changed

+214
-132
lines changed

apps/sim/app/api/copilot/chat/route.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,33 @@ export async function GET(req: NextRequest) {
454454
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
455455
}
456456

457+
let streamSnapshot: {
458+
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
459+
status: string
460+
} | null = null
461+
462+
if (chat.conversationId) {
463+
try {
464+
const { getStreamMeta, readStreamEvents } = await import(
465+
'@/lib/copilot/orchestrator/stream/buffer'
466+
)
467+
const [meta, events] = await Promise.all([
468+
getStreamMeta(chat.conversationId),
469+
readStreamEvents(chat.conversationId, 0),
470+
])
471+
streamSnapshot = {
472+
events: events || [],
473+
status: meta?.status || 'unknown',
474+
}
475+
} catch (err) {
476+
logger.warn('Failed to read stream snapshot for chat', {
477+
chatId,
478+
conversationId: chat.conversationId,
479+
error: err instanceof Error ? err.message : String(err),
480+
})
481+
}
482+
}
483+
457484
const transformedChat = {
458485
id: chat.id,
459486
title: chat.title,
@@ -466,6 +493,7 @@ export async function GET(req: NextRequest) {
466493
resources: Array.isArray(chat.resources) ? chat.resources : [],
467494
createdAt: chat.createdAt,
468495
updatedAt: chat.updatedAt,
496+
...(streamSnapshot ? { streamSnapshot } : {}),
469497
}
470498

471499
logger.info(`Retrieved chat ${chatId}`)

apps/sim/app/workspace/[workspaceId]/home/home.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ export function Home({ chatId }: HomeProps = {}) {
178178
const {
179179
messages,
180180
isSending,
181+
isReconnecting,
181182
sendMessage,
182183
stopGeneration,
183184
resolvedChatId,
@@ -330,7 +331,7 @@ export function Home({ chatId }: HomeProps = {}) {
330331
return () => ro.disconnect()
331332
}, [hasMessages])
332333

333-
if (!hasMessages && chatId && isLoadingHistory) {
334+
if (chatId && (isLoadingHistory || isReconnecting)) {
334335
return (
335336
<ChatSkeleton>
336337
<UserInput

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 117 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import type {
4545
export interface UseChatReturn {
4646
messages: ChatMessage[]
4747
isSending: boolean
48+
isReconnecting: boolean
4849
error: string | null
4950
resolvedChatId: string | undefined
5051
sendMessage: (
@@ -250,6 +251,7 @@ export function useChat(
250251
const queryClient = useQueryClient()
251252
const [messages, setMessages] = useState<ChatMessage[]>([])
252253
const [isSending, setIsSending] = useState(false)
254+
const [isReconnecting, setIsReconnecting] = useState(false)
253255
const [error, setError] = useState<string | null>(null)
254256
const [resolvedChatId, setResolvedChatId] = useState<string | undefined>(initialChatId)
255257
const [resources, setResources] = useState<MothershipResource[]>([])
@@ -268,6 +270,10 @@ export function useChat(
268270
}, [messageQueue])
269271

270272
const sendMessageRef = useRef<UseChatReturn['sendMessage']>(async () => {})
273+
const processSSEStreamRef = useRef<
274+
(reader: ReadableStreamDefaultReader<Uint8Array>, assistantId: string) => Promise<void>
275+
>(async () => {})
276+
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
271277

272278
const abortControllerRef = useRef<AbortController | null>(null)
273279
const chatIdRef = useRef<string | undefined>(initialChatId)
@@ -329,6 +335,7 @@ export function useChat(
329335
setMessages([])
330336
setError(null)
331337
setIsSending(false)
338+
setIsReconnecting(false)
332339
setResources([])
333340
setActiveResourceId(null)
334341
setMessageQueue([])
@@ -346,6 +353,7 @@ export function useChat(
346353
setMessages([])
347354
setError(null)
348355
setIsSending(false)
356+
setIsReconnecting(false)
349357
setResources([])
350358
setActiveResourceId(null)
351359
setMessageQueue([])
@@ -365,6 +373,95 @@ export function useChat(
365373
ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
366374
}
367375
}
376+
377+
// Kick off stream reconnection immediately if there's an active stream.
378+
// The stream snapshot was fetched in parallel with the chat history (same
379+
// API call), so there's no extra round-trip.
380+
const activeStreamId = chatHistory.activeStreamId
381+
const snapshot = chatHistory.streamSnapshot
382+
if (activeStreamId && !sendingRef.current) {
383+
const gen = ++streamGenRef.current
384+
const abortController = new AbortController()
385+
abortControllerRef.current = abortController
386+
streamIdRef.current = activeStreamId
387+
sendingRef.current = true
388+
setIsReconnecting(true)
389+
390+
const assistantId = crypto.randomUUID()
391+
392+
const reconnect = async () => {
393+
try {
394+
const encoder = new TextEncoder()
395+
396+
const batchEvents = snapshot?.events ?? []
397+
const streamStatus = snapshot?.status ?? ''
398+
399+
if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
400+
// No snapshot available — stream buffer expired. Clean up.
401+
const cid = chatIdRef.current
402+
if (cid) {
403+
fetch('/api/mothership/chat/stop', {
404+
method: 'POST',
405+
headers: { 'Content-Type': 'application/json' },
406+
body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }),
407+
}).catch(() => {})
408+
}
409+
return
410+
}
411+
412+
setIsSending(true)
413+
setIsReconnecting(false)
414+
415+
const lastEventId =
416+
batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0
417+
const isStreamDone = streamStatus === 'complete' || streamStatus === 'error'
418+
419+
const combinedStream = new ReadableStream<Uint8Array>({
420+
async start(controller) {
421+
if (batchEvents.length > 0) {
422+
const sseText = batchEvents
423+
.map((e) => `data: ${JSON.stringify(e.event)}\n`)
424+
.join('\n')
425+
controller.enqueue(encoder.encode(`${sseText}\n`))
426+
}
427+
428+
if (!isStreamDone) {
429+
try {
430+
const sseRes = await fetch(
431+
`/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`,
432+
{ signal: abortController.signal }
433+
)
434+
if (sseRes.ok && sseRes.body) {
435+
const reader = sseRes.body.getReader()
436+
while (true) {
437+
const { done, value } = await reader.read()
438+
if (done) break
439+
controller.enqueue(value)
440+
}
441+
}
442+
} catch (err) {
443+
if (!(err instanceof Error && err.name === 'AbortError')) {
444+
logger.warn('SSE tail failed during reconnect', err)
445+
}
446+
}
447+
}
448+
449+
controller.close()
450+
},
451+
})
452+
453+
await processSSEStreamRef.current(combinedStream.getReader(), assistantId)
454+
} catch (err) {
455+
if (err instanceof Error && err.name === 'AbortError') return
456+
} finally {
457+
setIsReconnecting(false)
458+
if (streamGenRef.current === gen) {
459+
finalizeRef.current()
460+
}
461+
}
462+
}
463+
reconnect()
464+
}
368465
}, [chatHistory, workspaceId])
369466

370467
useEffect(() => {
@@ -405,11 +502,14 @@ export function useChat(
405502

406503
const flush = () => {
407504
streamingBlocksRef.current = [...blocks]
408-
setMessages((prev) =>
409-
prev.map((m) =>
410-
m.id === assistantId ? { ...m, content: runningText, contentBlocks: [...blocks] } : m
411-
)
412-
)
505+
const snapshot = { content: runningText, contentBlocks: [...blocks] }
506+
setMessages((prev) => {
507+
const idx = prev.findIndex((m) => m.id === assistantId)
508+
if (idx >= 0) {
509+
return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
510+
}
511+
return [...prev, { id: assistantId, role: 'assistant' as const, ...snapshot }]
512+
})
413513
}
414514

415515
while (true) {
@@ -662,6 +762,9 @@ export function useChat(
662762
},
663763
[workspaceId, queryClient, addResource, removeResource]
664764
)
765+
useLayoutEffect(() => {
766+
processSSEStreamRef.current = processSSEStream
767+
})
665768

666769
const persistPartialResponse = useCallback(async () => {
667770
const chatId = chatIdRef.current
@@ -750,50 +853,9 @@ export function useChat(
750853
},
751854
[invalidateChatQueries]
752855
)
753-
754-
useEffect(() => {
755-
const activeStreamId = chatHistory?.activeStreamId
756-
if (!activeStreamId || !appliedChatIdRef.current || sendingRef.current) return
757-
758-
const gen = ++streamGenRef.current
759-
const abortController = new AbortController()
760-
abortControllerRef.current = abortController
761-
sendingRef.current = true
762-
setIsSending(true)
763-
764-
const assistantId = crypto.randomUUID()
765-
setMessages((prev) => [
766-
...prev,
767-
{
768-
id: assistantId,
769-
role: 'assistant' as const,
770-
content: '',
771-
contentBlocks: [],
772-
},
773-
])
774-
775-
const reconnect = async () => {
776-
try {
777-
const response = await fetch(`/api/copilot/chat/stream?streamId=${activeStreamId}&from=0`, {
778-
signal: abortController.signal,
779-
})
780-
if (!response.ok || !response.body) return
781-
await processSSEStream(response.body.getReader(), assistantId)
782-
} catch (err) {
783-
if (err instanceof Error && err.name === 'AbortError') return
784-
} finally {
785-
if (streamGenRef.current === gen) {
786-
finalize()
787-
}
788-
}
789-
}
790-
reconnect()
791-
792-
return () => {
793-
abortController.abort()
794-
appliedChatIdRef.current = undefined
795-
}
796-
}, [chatHistory?.activeStreamId, processSSEStream, finalize])
856+
useLayoutEffect(() => {
857+
finalizeRef.current = finalize
858+
})
797859

798860
const sendMessage = useCallback(
799861
async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => {
@@ -937,7 +999,11 @@ export function useChat(
937999
if (sendingRef.current) {
9381000
await persistPartialResponse()
9391001
}
940-
const sid = streamIdRef.current
1002+
const sid =
1003+
streamIdRef.current ||
1004+
queryClient.getQueryData<TaskChatHistory>(taskKeys.detail(chatIdRef.current))
1005+
?.activeStreamId ||
1006+
undefined
9411007
streamGenRef.current++
9421008
abortControllerRef.current?.abort()
9431009
abortControllerRef.current = null
@@ -1054,6 +1120,7 @@ export function useChat(
10541120
return {
10551121
messages,
10561122
isSending,
1123+
isReconnecting,
10571124
error,
10581125
resolvedChatId,
10591126
sendMessage,

0 commit comments

Comments
 (0)