Skip to content

Commit 00ff21a

Browse files
TheodoreSpeaksTheodore Li
andauthored
fix(workflow) Fix embedded workflow logs (#3587)
* Extract workflow run logic into shared util * Fix lint * Fix isRunning being stuck in true * Fix lint --------- Co-authored-by: Theodore Li <theo@sim.ai>
1 parent a2f8ed0 commit 00ff21a

File tree

2 files changed

+363
-414
lines changed

2 files changed

+363
-414
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 12 additions & 302 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@ import { useQueryClient } from '@tanstack/react-query'
44
import { v4 as uuidv4 } from 'uuid'
55
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
66
import { processStreamingBlockLogs } from '@/lib/tokenization'
7-
import type {
8-
BlockCompletedData,
9-
BlockErrorData,
10-
BlockStartedData,
11-
} from '@/lib/workflows/executor/execution-events'
127
import {
138
extractTriggerMockPayload,
149
selectBestTrigger,
@@ -21,21 +16,14 @@ import {
2116
} from '@/lib/workflows/triggers/triggers'
2217
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
2318
import {
24-
markOutgoingEdgesFromOutput,
25-
updateActiveBlockRefCount,
19+
type BlockEventHandlerConfig,
20+
createBlockEventHandlers,
2621
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
2722
import { getBlock } from '@/blocks'
2823
import type { SerializableExecutionState } from '@/executor/execution/types'
29-
import type {
30-
BlockLog,
31-
BlockState,
32-
ExecutionResult,
33-
NormalizedBlockOutput,
34-
StreamingExecution,
35-
} from '@/executor/types'
24+
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
3625
import { hasExecutionResult } from '@/executor/utils/errors'
3726
import { coerceValue } from '@/executor/utils/start-block'
38-
import { stripCloneSuffixes } from '@/executor/utils/subflow-utils'
3927
import { subscriptionKeys } from '@/hooks/queries/subscription'
4028
import { useExecutionStream } from '@/hooks/use-execution-stream'
4129
import { WorkflowValidationError } from '@/serializer'
@@ -63,20 +51,6 @@ interface DebugValidationResult {
6351
error?: string
6452
}
6553

66-
interface BlockEventHandlerConfig {
67-
workflowId?: string
68-
executionIdRef: { current: string }
69-
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
70-
activeBlocksSet: Set<string>
71-
activeBlockRefCounts: Map<string, number>
72-
accumulatedBlockLogs: BlockLog[]
73-
accumulatedBlockStates: Map<string, BlockState>
74-
executedBlockIds: Set<string>
75-
consoleMode: 'update' | 'add'
76-
includeStartConsoleEntry: boolean
77-
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
78-
}
79-
8054
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
8155

8256
function isRecord(value: unknown): value is Record<string, unknown> {
@@ -309,279 +283,15 @@ export function useWorkflowExecution() {
309283
)
310284

311285
const buildBlockEventHandlers = useCallback(
312-
(config: BlockEventHandlerConfig) => {
313-
const {
314-
workflowId,
315-
executionIdRef,
316-
workflowEdges,
317-
activeBlocksSet,
318-
activeBlockRefCounts,
319-
accumulatedBlockLogs,
320-
accumulatedBlockStates,
321-
executedBlockIds,
322-
consoleMode,
323-
includeStartConsoleEntry,
324-
onBlockCompleteCallback,
325-
} = config
326-
327-
/** Returns true if this execution was cancelled or superseded by another run. */
328-
const isStaleExecution = () =>
329-
!!(
330-
workflowId &&
331-
executionIdRef.current &&
332-
useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current
333-
)
334-
335-
const updateActiveBlocks = (blockId: string, isActive: boolean) => {
336-
if (!workflowId) return
337-
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
338-
setActiveBlocks(workflowId, new Set(activeBlocksSet))
339-
}
340-
341-
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
342-
if (!workflowId) return
343-
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
344-
}
345-
346-
const isContainerBlockType = (blockType?: string) => {
347-
return blockType === 'loop' || blockType === 'parallel'
348-
}
349-
350-
/** Extracts iteration and child-workflow fields shared across console entry call sites. */
351-
const extractIterationFields = (
352-
data: BlockStartedData | BlockCompletedData | BlockErrorData
353-
) => ({
354-
iterationCurrent: data.iterationCurrent,
355-
iterationTotal: data.iterationTotal,
356-
iterationType: data.iterationType,
357-
iterationContainerId: data.iterationContainerId,
358-
parentIterations: data.parentIterations,
359-
childWorkflowBlockId: data.childWorkflowBlockId,
360-
childWorkflowName: data.childWorkflowName,
361-
...('childWorkflowInstanceId' in data && {
362-
childWorkflowInstanceId: data.childWorkflowInstanceId,
363-
}),
364-
})
365-
366-
const createBlockLogEntry = (
367-
data: BlockCompletedData | BlockErrorData,
368-
options: { success: boolean; output?: unknown; error?: string }
369-
): BlockLog => ({
370-
blockId: data.blockId,
371-
blockName: data.blockName || 'Unknown Block',
372-
blockType: data.blockType || 'unknown',
373-
input: data.input || {},
374-
output: options.output ?? {},
375-
success: options.success,
376-
error: options.error,
377-
durationMs: data.durationMs,
378-
startedAt: data.startedAt,
379-
executionOrder: data.executionOrder,
380-
endedAt: data.endedAt,
381-
})
382-
383-
const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
384-
if (!workflowId) return
385-
addConsole({
386-
input: data.input || {},
387-
output,
388-
success: true,
389-
durationMs: data.durationMs,
390-
startedAt: data.startedAt,
391-
executionOrder: data.executionOrder,
392-
endedAt: data.endedAt,
393-
workflowId,
394-
blockId: data.blockId,
395-
executionId: executionIdRef.current,
396-
blockName: data.blockName || 'Unknown Block',
397-
blockType: data.blockType || 'unknown',
398-
...extractIterationFields(data),
399-
})
400-
}
401-
402-
const addConsoleErrorEntry = (data: BlockErrorData) => {
403-
if (!workflowId) return
404-
addConsole({
405-
input: data.input || {},
406-
output: {},
407-
success: false,
408-
error: data.error,
409-
durationMs: data.durationMs,
410-
startedAt: data.startedAt,
411-
executionOrder: data.executionOrder,
412-
endedAt: data.endedAt,
413-
workflowId,
414-
blockId: data.blockId,
415-
executionId: executionIdRef.current,
416-
blockName: data.blockName || 'Unknown Block',
417-
blockType: data.blockType || 'unknown',
418-
...extractIterationFields(data),
419-
})
420-
}
421-
422-
const updateConsoleEntry = (data: BlockCompletedData) => {
423-
updateConsole(
424-
data.blockId,
425-
{
426-
executionOrder: data.executionOrder,
427-
input: data.input || {},
428-
replaceOutput: data.output,
429-
success: true,
430-
durationMs: data.durationMs,
431-
startedAt: data.startedAt,
432-
endedAt: data.endedAt,
433-
isRunning: false,
434-
...extractIterationFields(data),
435-
},
436-
executionIdRef.current
437-
)
438-
}
439-
440-
const updateConsoleErrorEntry = (data: BlockErrorData) => {
441-
updateConsole(
442-
data.blockId,
443-
{
444-
executionOrder: data.executionOrder,
445-
input: data.input || {},
446-
replaceOutput: {},
447-
success: false,
448-
error: data.error,
449-
durationMs: data.durationMs,
450-
startedAt: data.startedAt,
451-
endedAt: data.endedAt,
452-
isRunning: false,
453-
...extractIterationFields(data),
454-
},
455-
executionIdRef.current
456-
)
457-
}
458-
459-
const onBlockStarted = (data: BlockStartedData) => {
460-
if (isStaleExecution()) return
461-
updateActiveBlocks(data.blockId, true)
462-
463-
if (!includeStartConsoleEntry || !workflowId) return
464-
465-
const startedAt = new Date().toISOString()
466-
addConsole({
467-
input: {},
468-
output: undefined,
469-
success: undefined,
470-
durationMs: undefined,
471-
startedAt,
472-
executionOrder: data.executionOrder,
473-
endedAt: undefined,
474-
workflowId,
475-
blockId: data.blockId,
476-
executionId: executionIdRef.current,
477-
blockName: data.blockName || 'Unknown Block',
478-
blockType: data.blockType || 'unknown',
479-
isRunning: true,
480-
...extractIterationFields(data),
481-
})
482-
}
483-
484-
const onBlockCompleted = (data: BlockCompletedData) => {
485-
if (isStaleExecution()) return
486-
updateActiveBlocks(data.blockId, false)
487-
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
488-
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
489-
executedBlockIds.add(data.blockId)
490-
accumulatedBlockStates.set(data.blockId, {
491-
output: data.output,
492-
executed: true,
493-
executionTime: data.durationMs,
494-
})
495-
496-
// For nested containers, the SSE blockId may be a cloned ID (e.g. P1__obranch-0).
497-
// Also record the original workflow-level ID so the canvas can highlight it.
498-
if (isContainerBlockType(data.blockType)) {
499-
const originalId = stripCloneSuffixes(data.blockId)
500-
if (originalId !== data.blockId) {
501-
executedBlockIds.add(originalId)
502-
if (workflowId) setBlockRunStatus(workflowId, originalId, 'success')
503-
}
504-
}
505-
506-
if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
507-
const output = data.output as Record<string, any> | undefined
508-
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
509-
if (!isEmptySubflow) return
510-
}
511-
512-
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
513-
514-
if (consoleMode === 'update') {
515-
updateConsoleEntry(data)
516-
} else {
517-
addConsoleEntry(data, data.output as NormalizedBlockOutput)
518-
}
519-
520-
if (onBlockCompleteCallback) {
521-
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
522-
logger.error('Error in onBlockComplete callback:', error)
523-
})
524-
}
525-
}
526-
527-
const onBlockError = (data: BlockErrorData) => {
528-
if (isStaleExecution()) return
529-
updateActiveBlocks(data.blockId, false)
530-
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
531-
markOutgoingEdges(data.blockId, { error: data.error })
532-
533-
executedBlockIds.add(data.blockId)
534-
accumulatedBlockStates.set(data.blockId, {
535-
output: { error: data.error },
536-
executed: true,
537-
executionTime: data.durationMs || 0,
538-
})
539-
540-
// For nested containers, also record the original workflow-level ID
541-
if (isContainerBlockType(data.blockType)) {
542-
const originalId = stripCloneSuffixes(data.blockId)
543-
if (originalId !== data.blockId) {
544-
executedBlockIds.add(originalId)
545-
if (workflowId) setBlockRunStatus(workflowId, originalId, 'error')
546-
}
547-
}
548-
549-
accumulatedBlockLogs.push(
550-
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
551-
)
552-
553-
if (consoleMode === 'update') {
554-
updateConsoleErrorEntry(data)
555-
} else {
556-
addConsoleErrorEntry(data)
557-
}
558-
}
559-
560-
const onBlockChildWorkflowStarted = (data: {
561-
blockId: string
562-
childWorkflowInstanceId: string
563-
iterationCurrent?: number
564-
iterationContainerId?: string
565-
executionOrder?: number
566-
}) => {
567-
if (isStaleExecution()) return
568-
updateConsole(
569-
data.blockId,
570-
{
571-
childWorkflowInstanceId: data.childWorkflowInstanceId,
572-
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
573-
...(data.iterationContainerId !== undefined && {
574-
iterationContainerId: data.iterationContainerId,
575-
}),
576-
...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }),
577-
},
578-
executionIdRef.current
579-
)
580-
}
581-
582-
return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
583-
},
584-
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
286+
(config: BlockEventHandlerConfig) =>
287+
createBlockEventHandlers(config, {
288+
addConsole,
289+
updateConsole,
290+
setActiveBlocks,
291+
setBlockRunStatus,
292+
setEdgeRunStatus,
293+
}),
294+
[addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus]
585295
)
586296

587297
/**

0 commit comments

Comments
 (0)