Skip to content

Commit b5c2070

Browse files
committed
Fix signaling
1 parent 8651081 commit b5c2070

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { NextResponse } from 'next/server'
2+
import { abortActiveStream } from '@/lib/copilot/chat-streaming'
3+
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
4+
5+
export async function POST(request: Request) {
6+
const { userId: authenticatedUserId, isAuthenticated } =
7+
await authenticateCopilotRequestSessionOnly()
8+
9+
if (!isAuthenticated || !authenticatedUserId) {
10+
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
11+
}
12+
13+
const body = await request.json().catch(() => ({}))
14+
const streamId = typeof body.streamId === 'string' ? body.streamId : ''
15+
16+
if (!streamId) {
17+
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
18+
}
19+
20+
const aborted = abortActiveStream(streamId)
21+
return NextResponse.json({ aborted })
22+
}

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,17 +614,28 @@ export function useChat(workspaceId: string, initialChatId?: string): UseChatRet
614614
if (sendingRef.current) {
615615
await persistPartialResponse()
616616
}
617+
const sid = streamIdRef.current
617618
streamGenRef.current++
618619
abortControllerRef.current?.abort()
619620
abortControllerRef.current = null
620621
sendingRef.current = false
621622
setIsSending(false)
622623
invalidateChatQueries()
624+
if (sid) {
625+
fetch('/api/copilot/chat/abort', {
626+
method: 'POST',
627+
headers: { 'Content-Type': 'application/json' },
628+
body: JSON.stringify({ streamId: sid }),
629+
}).catch(() => {})
630+
}
623631
}, [invalidateChatQueries, persistPartialResponse])
624632

625633
useEffect(() => {
626634
return () => {
627635
streamGenRef.current++
636+
// Only drop the browser→Sim read; the Sim→Go stream stays open
637+
// so the backend can finish persisting. Explicit abort is only
638+
// triggered by the stop button via /api/copilot/chat/abort.
628639
abortControllerRef.current?.abort()
629640
abortControllerRef.current = null
630641
sendingRef.current = false

apps/sim/lib/copilot/chat-streaming.ts

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,18 @@ import { env } from '@/lib/core/config/env'
1414

1515
const logger = createLogger('CopilotChatStreaming')
1616

17+
// Registry of in-flight Sim→Go streams so the explicit abort endpoint can
18+
// reach them. Keyed by streamId, cleaned up when the stream completes.
19+
const activeStreams = new Map<string, AbortController>()
20+
21+
export function abortActiveStream(streamId: string): boolean {
22+
const controller = activeStreams.get(streamId)
23+
if (!controller) return false
24+
controller.abort()
25+
activeStreams.delete(streamId)
26+
return true
27+
}
28+
1729
const FLUSH_EVENT_TYPES = new Set([
1830
'tool_call',
1931
'tool_result',
@@ -94,6 +106,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
94106
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
95107
let clientDisconnected = false
96108
const abortController = new AbortController()
109+
activeStreams.set(streamId, abortController)
97110

98111
return new ReadableStream({
99112
async start(controller) {
@@ -163,8 +176,14 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
163176
await eventWriter.close()
164177
await setStreamMeta(streamId, { status: 'complete', userId })
165178
} catch (error) {
166-
if (clientDisconnected || abortController.signal.aborted) {
167-
logger.info(`[${requestId}] Stream aborted by client disconnect`)
179+
if (abortController.signal.aborted) {
180+
logger.info(`[${requestId}] Stream aborted by explicit stop`)
181+
await eventWriter.close().catch(() => {})
182+
await setStreamMeta(streamId, { status: 'complete', userId })
183+
return
184+
}
185+
if (clientDisconnected) {
186+
logger.info(`[${requestId}] Stream ended after client disconnect`)
168187
await eventWriter.close().catch(() => {})
169188
await setStreamMeta(streamId, { status: 'complete', userId })
170189
return
@@ -183,14 +202,18 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
183202
},
184203
})
185204
} finally {
186-
controller.close()
205+
activeStreams.delete(streamId)
206+
try {
207+
controller.close()
208+
} catch {
209+
// Controller already closed from cancel() — safe to ignore
210+
}
187211
}
188212
},
189-
async cancel() {
213+
cancel() {
190214
clientDisconnected = true
191-
abortController.abort()
192215
if (eventWriter) {
193-
await eventWriter.flush()
216+
eventWriter.flush().catch(() => {})
194217
}
195218
},
196219
})

0 commit comments

Comments
 (0)