Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions apps/sim/app/api/workflows/[id]/execute/route.async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ const {
}))

vi.mock('@/lib/auth/hybrid', () => ({
AuthType: {
SESSION: 'session',
API_KEY: 'api_key',
INTERNAL_JWT: 'internal_jwt',
},
checkHybridAuth: mockCheckHybridAuth,
AuthType: {
SESSION: 'session',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ import { useQueryClient } from '@tanstack/react-query'
import { v4 as uuidv4 } from 'uuid'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import { processStreamingBlockLogs } from '@/lib/tokenization'
import type {
BlockCompletedData,
BlockErrorData,
BlockStartedData,
} from '@/lib/workflows/executor/execution-events'
import {
extractTriggerMockPayload,
selectBestTrigger,
Expand All @@ -21,21 +16,14 @@ import {
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import {
markOutgoingEdgesFromOutput,
updateActiveBlockRefCount,
type BlockEventHandlerConfig,
createBlockEventHandlers,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type {
BlockLog,
BlockState,
ExecutionResult,
NormalizedBlockOutput,
StreamingExecution,
} from '@/executor/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { stripCloneSuffixes } from '@/executor/utils/subflow-utils'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
Expand Down Expand Up @@ -63,20 +51,6 @@ interface DebugValidationResult {
error?: string
}

interface BlockEventHandlerConfig {
workflowId?: string
executionIdRef: { current: string }
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
activeBlocksSet: Set<string>
activeBlockRefCounts: Map<string, number>
accumulatedBlockLogs: BlockLog[]
accumulatedBlockStates: Map<string, BlockState>
executedBlockIds: Set<string>
consoleMode: 'update' | 'add'
includeStartConsoleEntry: boolean
onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise<void>
}

const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'

function isRecord(value: unknown): value is Record<string, unknown> {
Expand Down Expand Up @@ -309,279 +283,15 @@ export function useWorkflowExecution() {
)

const buildBlockEventHandlers = useCallback(
(config: BlockEventHandlerConfig) => {
const {
workflowId,
executionIdRef,
workflowEdges,
activeBlocksSet,
activeBlockRefCounts,
accumulatedBlockLogs,
accumulatedBlockStates,
executedBlockIds,
consoleMode,
includeStartConsoleEntry,
onBlockCompleteCallback,
} = config

/** Returns true if this execution was cancelled or superseded by another run. */
const isStaleExecution = () =>
!!(
workflowId &&
executionIdRef.current &&
useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current
)

const updateActiveBlocks = (blockId: string, isActive: boolean) => {
if (!workflowId) return
updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive)
setActiveBlocks(workflowId, new Set(activeBlocksSet))
}

const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
if (!workflowId) return
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
}

const isContainerBlockType = (blockType?: string) => {
return blockType === 'loop' || blockType === 'parallel'
}

/** Extracts iteration and child-workflow fields shared across console entry call sites. */
const extractIterationFields = (
data: BlockStartedData | BlockCompletedData | BlockErrorData
) => ({
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
iterationContainerId: data.iterationContainerId,
parentIterations: data.parentIterations,
childWorkflowBlockId: data.childWorkflowBlockId,
childWorkflowName: data.childWorkflowName,
...('childWorkflowInstanceId' in data && {
childWorkflowInstanceId: data.childWorkflowInstanceId,
}),
})

const createBlockLogEntry = (
data: BlockCompletedData | BlockErrorData,
options: { success: boolean; output?: unknown; error?: string }
): BlockLog => ({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: options.output ?? {},
success: options.success,
error: options.error,
durationMs: data.durationMs,
startedAt: data.startedAt,
executionOrder: data.executionOrder,
endedAt: data.endedAt,
})

const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => {
if (!workflowId) return
addConsole({
input: data.input || {},
output,
success: true,
durationMs: data.durationMs,
startedAt: data.startedAt,
executionOrder: data.executionOrder,
endedAt: data.endedAt,
workflowId,
blockId: data.blockId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
...extractIterationFields(data),
})
}

const addConsoleErrorEntry = (data: BlockErrorData) => {
if (!workflowId) return
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt: data.startedAt,
executionOrder: data.executionOrder,
endedAt: data.endedAt,
workflowId,
blockId: data.blockId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
...extractIterationFields(data),
})
}

const updateConsoleEntry = (data: BlockCompletedData) => {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: data.output,
success: true,
durationMs: data.durationMs,
startedAt: data.startedAt,
endedAt: data.endedAt,
isRunning: false,
...extractIterationFields(data),
},
executionIdRef.current
)
}

const updateConsoleErrorEntry = (data: BlockErrorData) => {
updateConsole(
data.blockId,
{
executionOrder: data.executionOrder,
input: data.input || {},
replaceOutput: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt: data.startedAt,
endedAt: data.endedAt,
isRunning: false,
...extractIterationFields(data),
},
executionIdRef.current
)
}

const onBlockStarted = (data: BlockStartedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, true)

if (!includeStartConsoleEntry || !workflowId) return

const startedAt = new Date().toISOString()
addConsole({
input: {},
output: undefined,
success: undefined,
durationMs: undefined,
startedAt,
executionOrder: data.executionOrder,
endedAt: undefined,
workflowId,
blockId: data.blockId,
executionId: executionIdRef.current,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
isRunning: true,
...extractIterationFields(data),
})
}

const onBlockCompleted = (data: BlockCompletedData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})

// For nested containers, the SSE blockId may be a cloned ID (e.g. P1__obranch-0).
// Also record the original workflow-level ID so the canvas can highlight it.
if (isContainerBlockType(data.blockType)) {
const originalId = stripCloneSuffixes(data.blockId)
if (originalId !== data.blockId) {
executedBlockIds.add(originalId)
if (workflowId) setBlockRunStatus(workflowId, originalId, 'success')
}
}

if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
const output = data.output as Record<string, any> | undefined
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
if (!isEmptySubflow) return
}

accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))

if (consoleMode === 'update') {
updateConsoleEntry(data)
} else {
addConsoleEntry(data, data.output as NormalizedBlockOutput)
}

if (onBlockCompleteCallback) {
onBlockCompleteCallback(data.blockId, data.output).catch((error) => {
logger.error('Error in onBlockComplete callback:', error)
})
}
}

const onBlockError = (data: BlockErrorData) => {
if (isStaleExecution()) return
updateActiveBlocks(data.blockId, false)
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
markOutgoingEdges(data.blockId, { error: data.error })

executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: { error: data.error },
executed: true,
executionTime: data.durationMs || 0,
})

// For nested containers, also record the original workflow-level ID
if (isContainerBlockType(data.blockType)) {
const originalId = stripCloneSuffixes(data.blockId)
if (originalId !== data.blockId) {
executedBlockIds.add(originalId)
if (workflowId) setBlockRunStatus(workflowId, originalId, 'error')
}
}

accumulatedBlockLogs.push(
createBlockLogEntry(data, { success: false, output: {}, error: data.error })
)

if (consoleMode === 'update') {
updateConsoleErrorEntry(data)
} else {
addConsoleErrorEntry(data)
}
}

const onBlockChildWorkflowStarted = (data: {
blockId: string
childWorkflowInstanceId: string
iterationCurrent?: number
iterationContainerId?: string
executionOrder?: number
}) => {
if (isStaleExecution()) return
updateConsole(
data.blockId,
{
childWorkflowInstanceId: data.childWorkflowInstanceId,
...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }),
...(data.iterationContainerId !== undefined && {
iterationContainerId: data.iterationContainerId,
}),
...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }),
},
executionIdRef.current
)
}

return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted }
},
[addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole]
(config: BlockEventHandlerConfig) =>
createBlockEventHandlers(config, {
addConsole,
updateConsole,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
}),
[addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus]
)

/**
Expand Down
Loading
Loading