Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apps/sim/app/api/a2a/serve/[agentId]/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ async function handleMessageStream(
if (response.body && isStreamingResponse) {
const reader = response.body.getReader()
const decoder = new TextDecoder()
let accumulatedContent = ''
const contentChunks: string[] = []
let finalContent: string | undefined

while (true) {
Expand All @@ -722,7 +722,7 @@ async function handleMessageStream(
const parsed = parseWorkflowSSEChunk(rawChunk)

if (parsed.content) {
accumulatedContent += parsed.content
contentChunks.push(parsed.content)
sendEvent('message', {
kind: 'message',
taskId,
Expand All @@ -738,6 +738,7 @@ async function handleMessageStream(
}
}

const accumulatedContent = contentChunks.join('')
const messageContent =
(finalContent !== undefined && finalContent.length > 0
? finalContent
Expand Down
8 changes: 6 additions & 2 deletions apps/sim/app/api/copilot/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,17 @@ export async function POST(req: NextRequest) {
},
})
} finally {
controller.close()
try {
controller.close()
} catch {
// controller may already be closed by cancel()
}
}
},
async cancel() {
clientDisconnected = true
if (eventWriter) {
await eventWriter.flush()
await eventWriter.close().catch(() => {})
}
},
})
Expand Down
12 changes: 12 additions & 0 deletions apps/sim/app/api/tools/video/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ async function generateWithRunway(
})

if (!statusResponse.ok) {
await statusResponse.text().catch(() => {})
throw new Error(`Runway status check failed: ${statusResponse.status}`)
}

Expand All @@ -352,6 +353,7 @@ async function generateWithRunway(

const videoResponse = await fetch(statusData.output[0])
if (!videoResponse.ok) {
await videoResponse.text().catch(() => {})
throw new Error(`Failed to download video: ${videoResponse.status}`)
}

Expand Down Expand Up @@ -448,6 +450,7 @@ async function generateWithVeo(
)

if (!statusResponse.ok) {
await statusResponse.text().catch(() => {})
throw new Error(`Veo status check failed: ${statusResponse.status}`)
}

Expand All @@ -472,6 +475,7 @@ async function generateWithVeo(
})

if (!videoResponse.ok) {
await videoResponse.text().catch(() => {})
throw new Error(`Failed to download video: ${videoResponse.status}`)
}

Expand Down Expand Up @@ -561,6 +565,7 @@ async function generateWithLuma(
)

if (!statusResponse.ok) {
await statusResponse.text().catch(() => {})
throw new Error(`Luma status check failed: ${statusResponse.status}`)
}

Expand All @@ -576,6 +581,7 @@ async function generateWithLuma(

const videoResponse = await fetch(videoUrl)
if (!videoResponse.ok) {
await videoResponse.text().catch(() => {})
throw new Error(`Failed to download video: ${videoResponse.status}`)
}

Expand Down Expand Up @@ -679,6 +685,7 @@ async function generateWithMiniMax(
)

if (!statusResponse.ok) {
await statusResponse.text().catch(() => {})
throw new Error(`MiniMax status check failed: ${statusResponse.status}`)
}

Expand Down Expand Up @@ -712,6 +719,7 @@ async function generateWithMiniMax(
)

if (!fileResponse.ok) {
await fileResponse.text().catch(() => {})
throw new Error(`Failed to download video: ${fileResponse.status}`)
}

Expand All @@ -725,6 +733,7 @@ async function generateWithMiniMax(
// Download the actual video file
const videoResponse = await fetch(videoUrl)
if (!videoResponse.ok) {
await videoResponse.text().catch(() => {})
throw new Error(`Failed to download video from URL: ${videoResponse.status}`)
}

Expand Down Expand Up @@ -881,6 +890,7 @@ async function generateWithFalAI(
)

if (!statusResponse.ok) {
await statusResponse.text().catch(() => {})
throw new Error(`Fal.ai status check failed: ${statusResponse.status}`)
}

Expand All @@ -899,6 +909,7 @@ async function generateWithFalAI(
)

if (!resultResponse.ok) {
await resultResponse.text().catch(() => {})
throw new Error(`Failed to fetch result: ${resultResponse.status}`)
}

Expand All @@ -911,6 +922,7 @@ async function generateWithFalAI(

const videoResponse = await fetch(videoUrl)
if (!videoResponse.ok) {
await videoResponse.text().catch(() => {})
throw new Error(`Failed to download video: ${videoResponse.status}`)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ export function useWorkflowExecution() {
const stream = new ReadableStream({
async start(controller) {
const { encodeSSE } = await import('@/lib/core/utils/sse')
const streamedContent = new Map<string, string>()
const streamedChunks = new Map<string, string[]>()
const streamReadingPromises: Promise<void>[] = []

const safeEnqueue = (data: Uint8Array) => {
Expand Down Expand Up @@ -845,8 +845,8 @@ export function useWorkflowExecution() {
const reader = streamingExecution.stream.getReader()
const blockId = (streamingExecution.execution as any)?.blockId

if (blockId && !streamedContent.has(blockId)) {
streamedContent.set(blockId, '')
if (blockId && !streamedChunks.has(blockId)) {
streamedChunks.set(blockId, [])
}

try {
Expand All @@ -860,13 +860,13 @@ export function useWorkflowExecution() {
}
const chunk = new TextDecoder().decode(value)
if (blockId) {
streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk)
streamedChunks.get(blockId)!.push(chunk)
}

let chunkToSend = chunk
if (blockId && !processedFirstChunk.has(blockId)) {
processedFirstChunk.add(blockId)
if (streamedContent.size > 1) {
if (streamedChunks.size > 1) {
chunkToSend = `\n\n${chunk}`
}
}
Expand All @@ -884,7 +884,7 @@ export function useWorkflowExecution() {
// Handle non-streaming blocks (like Function blocks)
const onBlockComplete = async (blockId: string, output: any) => {
// Skip if this block already had streaming content (avoid duplicates)
if (streamedContent.has(blockId)) {
if (streamedChunks.has(blockId)) {
logger.debug('[handleRunWorkflow] Skipping onBlockComplete for streaming block', {
blockId,
})
Expand Down Expand Up @@ -921,13 +921,13 @@ export function useWorkflowExecution() {
: JSON.stringify(outputValue, null, 2)

// Add separator if this isn't the first output
const separator = streamedContent.size > 0 ? '\n\n' : ''
const separator = streamedChunks.size > 0 ? '\n\n' : ''

// Send the non-streaming block output as a chunk
safeEnqueue(encodeSSE({ blockId, chunk: separator + formattedOutput }))

// Track that we've sent output for this block
streamedContent.set(blockId, formattedOutput)
streamedChunks.set(blockId, [formattedOutput])
}
}
}
Expand Down Expand Up @@ -969,6 +969,12 @@ export function useWorkflowExecution() {
})
}

// Resolve chunks to final strings for consumption
const streamedContent = new Map<string, string>()
for (const [id, chunks] of streamedChunks) {
streamedContent.set(id, chunks.join(''))
}

// Update streamed content and apply tokenization
if (result.logs) {
result.logs.forEach((log: BlockLog) => {
Expand Down Expand Up @@ -1316,7 +1322,7 @@ export function useWorkflowExecution() {

const activeBlocksSet = new Set<string>()
const activeBlockRefCounts = new Map<string, number>()
const streamedContent = new Map<string, string>()
const streamedChunks = new Map<string, string[]>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
Expand Down Expand Up @@ -1374,8 +1380,10 @@ export function useWorkflowExecution() {
onBlockChildWorkflowStarted: blockHandlers.onBlockChildWorkflowStarted,

onStreamChunk: (data) => {
const existing = streamedContent.get(data.blockId) || ''
streamedContent.set(data.blockId, existing + data.chunk)
if (!streamedChunks.has(data.blockId)) {
streamedChunks.set(data.blockId, [])
}
streamedChunks.get(data.blockId)!.push(data.chunk)

// Call onStream callback if provided (create a fake StreamingExecution)
if (onStream && isExecutingFromChat) {
Expand All @@ -1386,11 +1394,12 @@ export function useWorkflowExecution() {
},
})

const accumulated = streamedChunks.get(data.blockId)!.join('')
Comment thread
waleedlatif1 marked this conversation as resolved.
Outdated
const streamingExec: StreamingExecution = {
stream,
execution: {
success: true,
output: { content: existing + data.chunk },
output: { content: accumulated },
blockId: data.blockId,
} as any,
}
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ export class AgentBlockHandler implements BlockHandler {
})

if (!response.ok) {
await response.text().catch(() => {})
logger.error(`Failed to fetch custom tools: ${response.status}`)
return null
}
Expand Down
9 changes: 5 additions & 4 deletions apps/sim/executor/handlers/agent/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,22 @@ export class Memory {
ctx: ExecutionContext,
inputs: AgentInputs
): ReadableStream<Uint8Array> {
let accumulatedContent = ''
const chunks: string[] = []
const decoder = new TextDecoder()

const transformStream = new TransformStream<Uint8Array, Uint8Array>({
transform: (chunk, controller) => {
controller.enqueue(chunk)
const decoded = decoder.decode(chunk, { stream: true })
accumulatedContent += decoded
chunks.push(decoded)
},

flush: () => {
if (accumulatedContent.trim()) {
const content = chunks.join('')
if (content.trim()) {
this.appendToMemory(ctx, inputs, {
role: 'assistant',
content: accumulatedContent,
content,
}).catch((error) => logger.error('Failed to persist streaming response:', error))
}
},
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/executor/handlers/workflow/workflow-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ describe('WorkflowBlockHandler', () => {
ok: false,
status: 404,
statusText: 'Not Found',
text: () => Promise.resolve(''),
})

await expect(handler.execute(mockContext, mockBlock, inputs)).rejects.toThrow(
Expand All @@ -168,6 +169,7 @@ describe('WorkflowBlockHandler', () => {
ok: false,
status: 404,
statusText: 'Not Found',
text: () => Promise.resolve(''),
})

const result = await (handler as any).loadChildWorkflow(workflowId)
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/workflow/workflow-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ export class WorkflowBlockHandler implements BlockHandler {
const response = await fetch(url.toString(), { headers })

if (!response.ok) {
await response.text().catch(() => {})
if (response.status === HTTP.STATUS.NOT_FOUND) {
logger.warn(`Child workflow ${workflowId} not found`)
return null
Expand Down
1 change: 1 addition & 0 deletions apps/sim/lib/a2a/push-notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ export async function deliverPushNotification(taskId: string, state: TaskState):
})

if (!response.ok) {
await response.text().catch(() => {})
logger.error('Push notification delivery failed', {
taskId,
url: config.url,
Expand Down
Loading