Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions apps/sim/app/api/copilot/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from '@/lib/copilot/chat-streaming'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
Expand Down Expand Up @@ -454,6 +455,30 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}

let streamSnapshot: {
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
status: string
} | null = null

if (chat.conversationId) {
try {
const [meta, events] = await Promise.all([
getStreamMeta(chat.conversationId),
readStreamEvents(chat.conversationId, 0),
])
streamSnapshot = {
events: events || [],
status: meta?.status || 'unknown',
}
} catch (err) {
logger.warn('Failed to read stream snapshot for chat', {
chatId,
conversationId: chat.conversationId,
error: err instanceof Error ? err.message : String(err),
})
}
}

const transformedChat = {
id: chat.id,
title: chat.title,
Expand All @@ -466,6 +491,7 @@ export async function GET(req: NextRequest) {
resources: Array.isArray(chat.resources) ? chat.resources : [],
createdAt: chat.createdAt,
updatedAt: chat.updatedAt,
...(streamSnapshot ? { streamSnapshot } : {}),
}

logger.info(`Retrieved chat ${chatId}`)
Expand Down
3 changes: 2 additions & 1 deletion apps/sim/app/workspace/[workspaceId]/home/home.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ export function Home({ chatId }: HomeProps = {}) {
const {
messages,
isSending,
isReconnecting,
sendMessage,
stopGeneration,
resolvedChatId,
Expand Down Expand Up @@ -330,7 +331,7 @@ export function Home({ chatId }: HomeProps = {}) {
return () => ro.disconnect()
}, [hasMessages])

if (!hasMessages && chatId && isLoadingHistory) {
if (chatId && (isLoadingHistory || isReconnecting)) {
return (
<ChatSkeleton>
<UserInput
Expand Down
167 changes: 117 additions & 50 deletions apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import type {
export interface UseChatReturn {
messages: ChatMessage[]
isSending: boolean
isReconnecting: boolean
error: string | null
resolvedChatId: string | undefined
sendMessage: (
Expand Down Expand Up @@ -250,6 +251,7 @@ export function useChat(
const queryClient = useQueryClient()
const [messages, setMessages] = useState<ChatMessage[]>([])
const [isSending, setIsSending] = useState(false)
const [isReconnecting, setIsReconnecting] = useState(false)
const [error, setError] = useState<string | null>(null)
const [resolvedChatId, setResolvedChatId] = useState<string | undefined>(initialChatId)
const [resources, setResources] = useState<MothershipResource[]>([])
Expand All @@ -268,6 +270,10 @@ export function useChat(
}, [messageQueue])

const sendMessageRef = useRef<UseChatReturn['sendMessage']>(async () => {})
const processSSEStreamRef = useRef<
(reader: ReadableStreamDefaultReader<Uint8Array>, assistantId: string) => Promise<void>
>(async () => {})
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})

const abortControllerRef = useRef<AbortController | null>(null)
const chatIdRef = useRef<string | undefined>(initialChatId)
Expand Down Expand Up @@ -329,6 +335,7 @@ export function useChat(
setMessages([])
setError(null)
setIsSending(false)
setIsReconnecting(false)
setResources([])
setActiveResourceId(null)
setMessageQueue([])
Expand All @@ -346,6 +353,7 @@ export function useChat(
setMessages([])
setError(null)
setIsSending(false)
setIsReconnecting(false)
setResources([])
setActiveResourceId(null)
setMessageQueue([])
Expand All @@ -365,6 +373,95 @@ export function useChat(
ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
}
}

// Kick off stream reconnection immediately if there's an active stream.
// The stream snapshot was fetched in parallel with the chat history (same
// API call), so there's no extra round-trip.
const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
streamIdRef.current = activeStreamId
sendingRef.current = true
setIsReconnecting(true)

const assistantId = crypto.randomUUID()

const reconnect = async () => {
try {
const encoder = new TextEncoder()

const batchEvents = snapshot?.events ?? []
const streamStatus = snapshot?.status ?? ''

if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
// No snapshot available — stream buffer expired. Clean up.
const cid = chatIdRef.current
if (cid) {
fetch('/api/mothership/chat/stop', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }),
}).catch(() => {})
}
return
}

setIsSending(true)
setIsReconnecting(false)

const lastEventId =
batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0
const isStreamDone = streamStatus === 'complete' || streamStatus === 'error'

const combinedStream = new ReadableStream<Uint8Array>({
async start(controller) {
if (batchEvents.length > 0) {
const sseText = batchEvents
.map((e) => `data: ${JSON.stringify(e.event)}\n`)
.join('\n')
controller.enqueue(encoder.encode(`${sseText}\n`))
}

if (!isStreamDone) {
try {
const sseRes = await fetch(
`/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`,
{ signal: abortController.signal }
)
if (sseRes.ok && sseRes.body) {
const reader = sseRes.body.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
controller.enqueue(value)
}
}
} catch (err) {
if (!(err instanceof Error && err.name === 'AbortError')) {
logger.warn('SSE tail failed during reconnect', err)
}
}
}

controller.close()
},
})

await processSSEStreamRef.current(combinedStream.getReader(), assistantId)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
} finally {
setIsReconnecting(false)
if (streamGenRef.current === gen) {
finalizeRef.current()
}
}
}
reconnect()
}
}, [chatHistory, workspaceId])

useEffect(() => {
Expand Down Expand Up @@ -405,11 +502,14 @@ export function useChat(

const flush = () => {
streamingBlocksRef.current = [...blocks]
setMessages((prev) =>
prev.map((m) =>
m.id === assistantId ? { ...m, content: runningText, contentBlocks: [...blocks] } : m
)
)
const snapshot = { content: runningText, contentBlocks: [...blocks] }
setMessages((prev) => {
const idx = prev.findIndex((m) => m.id === assistantId)
if (idx >= 0) {
return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
}
return [...prev, { id: assistantId, role: 'assistant' as const, ...snapshot }]
})
}

while (true) {
Expand Down Expand Up @@ -662,6 +762,9 @@ export function useChat(
},
[workspaceId, queryClient, addResource, removeResource]
)
useLayoutEffect(() => {
processSSEStreamRef.current = processSSEStream
})

const persistPartialResponse = useCallback(async () => {
const chatId = chatIdRef.current
Expand Down Expand Up @@ -750,50 +853,9 @@ export function useChat(
},
[invalidateChatQueries]
)

useEffect(() => {
const activeStreamId = chatHistory?.activeStreamId
if (!activeStreamId || !appliedChatIdRef.current || sendingRef.current) return

const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
sendingRef.current = true
setIsSending(true)

const assistantId = crypto.randomUUID()
setMessages((prev) => [
...prev,
{
id: assistantId,
role: 'assistant' as const,
content: '',
contentBlocks: [],
},
])

const reconnect = async () => {
try {
const response = await fetch(`/api/copilot/chat/stream?streamId=${activeStreamId}&from=0`, {
signal: abortController.signal,
})
if (!response.ok || !response.body) return
await processSSEStream(response.body.getReader(), assistantId)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
} finally {
if (streamGenRef.current === gen) {
finalize()
}
}
}
reconnect()

return () => {
abortController.abort()
appliedChatIdRef.current = undefined
}
}, [chatHistory?.activeStreamId, processSSEStream, finalize])
useLayoutEffect(() => {
finalizeRef.current = finalize
})

const sendMessage = useCallback(
async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => {
Expand Down Expand Up @@ -937,7 +999,11 @@ export function useChat(
if (sendingRef.current) {
await persistPartialResponse()
}
const sid = streamIdRef.current
const sid =
streamIdRef.current ||
queryClient.getQueryData<TaskChatHistory>(taskKeys.detail(chatIdRef.current))
?.activeStreamId ||
undefined
streamGenRef.current++
abortControllerRef.current?.abort()
abortControllerRef.current = null
Expand Down Expand Up @@ -1054,6 +1120,7 @@ export function useChat(
return {
messages,
isSending,
isReconnecting,
error,
resolvedChatId,
sendMessage,
Expand Down
Loading
Loading