diff --git a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts index dc94343ad1..7d6c599dcf 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts @@ -18,11 +18,6 @@ const { })) vi.mock('@/lib/auth/hybrid', () => ({ - AuthType: { - SESSION: 'session', - API_KEY: 'api_key', - INTERNAL_JWT: 'internal_jwt', - }, checkHybridAuth: mockCheckHybridAuth, AuthType: { SESSION: 'session', diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index b8c7604fcc..0943d3e04b 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -4,11 +4,6 @@ import { useQueryClient } from '@tanstack/react-query' import { v4 as uuidv4 } from 'uuid' import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans' import { processStreamingBlockLogs } from '@/lib/tokenization' -import type { - BlockCompletedData, - BlockErrorData, - BlockStartedData, -} from '@/lib/workflows/executor/execution-events' import { extractTriggerMockPayload, selectBestTrigger, @@ -21,21 +16,14 @@ import { } from '@/lib/workflows/triggers/triggers' import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow' import { - markOutgoingEdgesFromOutput, - updateActiveBlockRefCount, + type BlockEventHandlerConfig, + createBlockEventHandlers, } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils' import { getBlock } from '@/blocks' import type { SerializableExecutionState } from '@/executor/execution/types' -import type { - BlockLog, - BlockState, - ExecutionResult, - NormalizedBlockOutput, - StreamingExecution, -} from '@/executor/types' +import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types' import { hasExecutionResult } from '@/executor/utils/errors' import { coerceValue } from '@/executor/utils/start-block' -import { stripCloneSuffixes } from '@/executor/utils/subflow-utils' import { subscriptionKeys } from '@/hooks/queries/subscription' import { useExecutionStream } from '@/hooks/use-execution-stream' import { WorkflowValidationError } from '@/serializer' @@ -63,20 +51,6 @@ interface DebugValidationResult { error?: string } -interface BlockEventHandlerConfig { - workflowId?: string - executionIdRef: { current: string } - workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }> - activeBlocksSet: Set - activeBlockRefCounts: Map - accumulatedBlockLogs: BlockLog[] - accumulatedBlockStates: Map - executedBlockIds: Set - consoleMode: 'update' | 'add' - includeStartConsoleEntry: boolean - onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise -} - const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed' function isRecord(value: unknown): value is Record { @@ -309,279 +283,15 @@ export function useWorkflowExecution() { ) const buildBlockEventHandlers = useCallback( - (config: BlockEventHandlerConfig) => { - const { - workflowId, - executionIdRef, - workflowEdges, - activeBlocksSet, - activeBlockRefCounts, - accumulatedBlockLogs, - accumulatedBlockStates, - executedBlockIds, - consoleMode, - includeStartConsoleEntry, - onBlockCompleteCallback, - } = config - - /** Returns true if this execution was cancelled or superseded by another run. */ - const isStaleExecution = () => - !!( - workflowId && - executionIdRef.current && - useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current - ) - - const updateActiveBlocks = (blockId: string, isActive: boolean) => { - if (!workflowId) return - updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive) - setActiveBlocks(workflowId, new Set(activeBlocksSet)) - } - - const markOutgoingEdges = (blockId: string, output: Record | undefined) => { - if (!workflowId) return - markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus) - } - - const isContainerBlockType = (blockType?: string) => { - return blockType === 'loop' || blockType === 'parallel' - } - - /** Extracts iteration and child-workflow fields shared across console entry call sites. */ - const extractIterationFields = ( - data: BlockStartedData | BlockCompletedData | BlockErrorData - ) => ({ - iterationCurrent: data.iterationCurrent, - iterationTotal: data.iterationTotal, - iterationType: data.iterationType, - iterationContainerId: data.iterationContainerId, - parentIterations: data.parentIterations, - childWorkflowBlockId: data.childWorkflowBlockId, - childWorkflowName: data.childWorkflowName, - ...('childWorkflowInstanceId' in data && { - childWorkflowInstanceId: data.childWorkflowInstanceId, - }), - }) - - const createBlockLogEntry = ( - data: BlockCompletedData | BlockErrorData, - options: { success: boolean; output?: unknown; error?: string } - ): BlockLog => ({ - blockId: data.blockId, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - input: data.input || {}, - output: options.output ?? {}, - success: options.success, - error: options.error, - durationMs: data.durationMs, - startedAt: data.startedAt, - executionOrder: data.executionOrder, - endedAt: data.endedAt, - }) - - const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => { - if (!workflowId) return - addConsole({ - input: data.input || {}, - output, - success: true, - durationMs: data.durationMs, - startedAt: data.startedAt, - executionOrder: data.executionOrder, - endedAt: data.endedAt, - workflowId, - blockId: data.blockId, - executionId: executionIdRef.current, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - ...extractIterationFields(data), - }) - } - - const addConsoleErrorEntry = (data: BlockErrorData) => { - if (!workflowId) return - addConsole({ - input: data.input || {}, - output: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt: data.startedAt, - executionOrder: data.executionOrder, - endedAt: data.endedAt, - workflowId, - blockId: data.blockId, - executionId: executionIdRef.current, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - ...extractIterationFields(data), - }) - } - - const updateConsoleEntry = (data: BlockCompletedData) => { - updateConsole( - data.blockId, - { - executionOrder: data.executionOrder, - input: data.input || {}, - replaceOutput: data.output, - success: true, - durationMs: data.durationMs, - startedAt: data.startedAt, - endedAt: data.endedAt, - isRunning: false, - ...extractIterationFields(data), - }, - executionIdRef.current - ) - } - - const updateConsoleErrorEntry = (data: BlockErrorData) => { - updateConsole( - data.blockId, - { - executionOrder: data.executionOrder, - input: data.input || {}, - replaceOutput: {}, - success: false, - error: data.error, - durationMs: data.durationMs, - startedAt: data.startedAt, - endedAt: data.endedAt, - isRunning: false, - ...extractIterationFields(data), - }, - executionIdRef.current - ) - } - - const onBlockStarted = (data: BlockStartedData) => { - if (isStaleExecution()) return - updateActiveBlocks(data.blockId, true) - - if (!includeStartConsoleEntry || !workflowId) return - - const startedAt = new Date().toISOString() - addConsole({ - input: {}, - output: undefined, - success: undefined, - durationMs: undefined, - startedAt, - executionOrder: data.executionOrder, - endedAt: undefined, - workflowId, - blockId: data.blockId, - executionId: executionIdRef.current, - blockName: data.blockName || 'Unknown Block', - blockType: data.blockType || 'unknown', - isRunning: true, - ...extractIterationFields(data), - }) - } - - const onBlockCompleted = (data: BlockCompletedData) => { - if (isStaleExecution()) return - updateActiveBlocks(data.blockId, false) - if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success') - markOutgoingEdges(data.blockId, data.output as Record | undefined) - executedBlockIds.add(data.blockId) - accumulatedBlockStates.set(data.blockId, { - output: data.output, - executed: true, - executionTime: data.durationMs, - }) - - // For nested containers, the SSE blockId may be a cloned ID (e.g. P1__obranch-0). - // Also record the original workflow-level ID so the canvas can highlight it. - if (isContainerBlockType(data.blockType)) { - const originalId = stripCloneSuffixes(data.blockId) - if (originalId !== data.blockId) { - executedBlockIds.add(originalId) - if (workflowId) setBlockRunStatus(workflowId, originalId, 'success') - } - } - - if (isContainerBlockType(data.blockType) && !data.iterationContainerId) { - const output = data.output as Record | undefined - const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0 - if (!isEmptySubflow) return - } - - accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) - - if (consoleMode === 'update') { - updateConsoleEntry(data) - } else { - addConsoleEntry(data, data.output as NormalizedBlockOutput) - } - - if (onBlockCompleteCallback) { - onBlockCompleteCallback(data.blockId, data.output).catch((error) => { - logger.error('Error in onBlockComplete callback:', error) - }) - } - } - - const onBlockError = (data: BlockErrorData) => { - if (isStaleExecution()) return - updateActiveBlocks(data.blockId, false) - if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error') - markOutgoingEdges(data.blockId, { error: data.error }) - - executedBlockIds.add(data.blockId) - accumulatedBlockStates.set(data.blockId, { - output: { error: data.error }, - executed: true, - executionTime: data.durationMs || 0, - }) - - // For nested containers, also record the original workflow-level ID - if (isContainerBlockType(data.blockType)) { - const originalId = stripCloneSuffixes(data.blockId) - if (originalId !== data.blockId) { - executedBlockIds.add(originalId) - if (workflowId) setBlockRunStatus(workflowId, originalId, 'error') - } - } - - accumulatedBlockLogs.push( - createBlockLogEntry(data, { success: false, output: {}, error: data.error }) - ) - - if (consoleMode === 'update') { - updateConsoleErrorEntry(data) - } else { - addConsoleErrorEntry(data) - } - } - - const onBlockChildWorkflowStarted = (data: { - blockId: string - childWorkflowInstanceId: string - iterationCurrent?: number - iterationContainerId?: string - executionOrder?: number - }) => { - if (isStaleExecution()) return - updateConsole( - data.blockId, - { - childWorkflowInstanceId: data.childWorkflowInstanceId, - ...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }), - ...(data.iterationContainerId !== undefined && { - iterationContainerId: data.iterationContainerId, - }), - ...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }), - }, - executionIdRef.current - ) - } - - return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted } - }, - [addConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, updateConsole] + (config: BlockEventHandlerConfig) => + createBlockEventHandlers(config, { + addConsole, + updateConsole, + setActiveBlocks, + setBlockRunStatus, + setEdgeRunStatus, + }), + [addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus] ) /** diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts index cd30578f23..7f24029020 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts @@ -1,6 +1,23 @@ +import { createLogger } from '@sim/logger' import { v4 as uuidv4 } from 'uuid' -import type { ExecutionResult, StreamingExecution } from '@/executor/types' +import type { + BlockCompletedData, + BlockErrorData, + BlockStartedData, +} from '@/lib/workflows/executor/execution-events' +import type { + BlockLog, + BlockState, + ExecutionResult, + NormalizedBlockOutput, + StreamingExecution, +} from '@/executor/types' +import { stripCloneSuffixes } from '@/executor/utils/subflow-utils' + +const logger = createLogger('workflow-execution-utils') + import { useExecutionStore } from '@/stores/execution' +import type { ConsoleEntry, ConsoleUpdate } from '@/stores/terminal' import { useTerminalConsoleStore } from '@/stores/terminal' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' @@ -85,6 +102,310 @@ export function markOutgoingEdgesFromOutput( } } +export interface BlockEventHandlerConfig { + workflowId?: string + executionIdRef: { current: string } + workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }> + activeBlocksSet: Set + activeBlockRefCounts: Map + accumulatedBlockLogs: BlockLog[] + accumulatedBlockStates: Map + executedBlockIds: Set + consoleMode: 'update' | 'add' + includeStartConsoleEntry: boolean + onBlockCompleteCallback?: (blockId: string, output: unknown) => Promise +} + +export interface BlockEventHandlerDeps { + addConsole: (entry: Omit) => ConsoleEntry + updateConsole: (blockId: string, update: string | ConsoleUpdate, executionId?: string) => void + setActiveBlocks: (workflowId: string, blocks: Set) => void + setBlockRunStatus: (workflowId: string, blockId: string, status: 'success' | 'error') => void + setEdgeRunStatus: (workflowId: string, edgeId: string, status: 'success' | 'error') => void +} + +/** + * Creates block event handlers for SSE execution events. + * Shared by the workflow execution hook and standalone execution utilities. + */ +export function createBlockEventHandlers( + config: BlockEventHandlerConfig, + deps: BlockEventHandlerDeps +) { + const { + workflowId, + executionIdRef, + workflowEdges, + activeBlocksSet, + activeBlockRefCounts, + accumulatedBlockLogs, + accumulatedBlockStates, + executedBlockIds, + consoleMode, + includeStartConsoleEntry, + onBlockCompleteCallback, + } = config + + const { addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } = deps + + const isStaleExecution = () => + !!( + workflowId && + executionIdRef.current && + useExecutionStore.getState().getCurrentExecutionId(workflowId) !== executionIdRef.current + ) + + const updateActiveBlocks = (blockId: string, isActive: boolean) => { + if (!workflowId) return + updateActiveBlockRefCount(activeBlockRefCounts, activeBlocksSet, blockId, isActive) + setActiveBlocks(workflowId, new Set(activeBlocksSet)) + } + + const markOutgoingEdges = (blockId: string, output: Record | undefined) => { + if (!workflowId) return + markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus) + } + + const isContainerBlockType = (blockType?: string) => { + return blockType === 'loop' || blockType === 'parallel' + } + + const extractIterationFields = ( + data: BlockStartedData | BlockCompletedData | BlockErrorData + ) => ({ + iterationCurrent: data.iterationCurrent, + iterationTotal: data.iterationTotal, + iterationType: data.iterationType, + iterationContainerId: data.iterationContainerId, + parentIterations: data.parentIterations, + childWorkflowBlockId: data.childWorkflowBlockId, + childWorkflowName: data.childWorkflowName, + ...('childWorkflowInstanceId' in data && { + childWorkflowInstanceId: data.childWorkflowInstanceId, + }), + }) + + const createBlockLogEntry = ( + data: BlockCompletedData | BlockErrorData, + options: { success: boolean; output?: unknown; error?: string } + ): BlockLog => ({ + blockId: data.blockId, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + input: data.input || {}, + output: options.output ?? {}, + success: options.success, + error: options.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + }) + + const addConsoleEntry = (data: BlockCompletedData, output: NormalizedBlockOutput) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId: executionIdRef.current, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + ...extractIterationFields(data), + }) + } + + const addConsoleErrorEntry = (data: BlockErrorData) => { + if (!workflowId) return + addConsole({ + input: data.input || {}, + output: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + executionOrder: data.executionOrder, + endedAt: data.endedAt, + workflowId, + blockId: data.blockId, + executionId: executionIdRef.current, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + ...extractIterationFields(data), + }) + } + + const updateConsoleEntry = (data: BlockCompletedData) => { + updateConsole( + data.blockId, + { + executionOrder: data.executionOrder, + input: data.input || {}, + replaceOutput: data.output, + success: true, + durationMs: data.durationMs, + startedAt: data.startedAt, + endedAt: data.endedAt, + isRunning: false, + ...extractIterationFields(data), + }, + executionIdRef.current + ) + } + + const updateConsoleErrorEntry = (data: BlockErrorData) => { + updateConsole( + data.blockId, + { + executionOrder: data.executionOrder, + input: data.input || {}, + replaceOutput: {}, + success: false, + error: data.error, + durationMs: data.durationMs, + startedAt: data.startedAt, + endedAt: data.endedAt, + isRunning: false, + ...extractIterationFields(data), + }, + executionIdRef.current + ) + } + + const onBlockStarted = (data: BlockStartedData) => { + if (isStaleExecution()) return + updateActiveBlocks(data.blockId, true) + + if (!includeStartConsoleEntry || !workflowId) return + + const startedAt = new Date().toISOString() + addConsole({ + input: {}, + output: undefined, + success: undefined, + durationMs: undefined, + startedAt, + executionOrder: data.executionOrder, + endedAt: undefined, + workflowId, + blockId: data.blockId, + executionId: executionIdRef.current, + blockName: data.blockName || 'Unknown Block', + blockType: data.blockType || 'unknown', + isRunning: true, + ...extractIterationFields(data), + }) + } + + const onBlockCompleted = (data: BlockCompletedData) => { + if (isStaleExecution()) return + updateActiveBlocks(data.blockId, false) + if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success') + markOutgoingEdges(data.blockId, data.output as Record | undefined) + executedBlockIds.add(data.blockId) + accumulatedBlockStates.set(data.blockId, { + output: data.output, + executed: true, + executionTime: data.durationMs, + }) + + if (isContainerBlockType(data.blockType)) { + const originalId = stripCloneSuffixes(data.blockId) + if (originalId !== data.blockId) { + executedBlockIds.add(originalId) + if (workflowId) setBlockRunStatus(workflowId, originalId, 'success') + } + } + + if (isContainerBlockType(data.blockType) && !data.iterationContainerId) { + const output = data.output as Record | undefined + const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0 + if (!isEmptySubflow) { + if (includeStartConsoleEntry) { + updateConsoleEntry(data) + } + return + } + } + + accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output })) + + if (consoleMode === 'update') { + updateConsoleEntry(data) + } else { + addConsoleEntry(data, data.output as NormalizedBlockOutput) + } + + if (onBlockCompleteCallback) { + onBlockCompleteCallback(data.blockId, data.output).catch((error) => { + logger.error('Error in onBlockComplete callback:', { blockId: data.blockId, error }) + }) + } + } + + const onBlockError = (data: BlockErrorData) => { + if (isStaleExecution()) return + updateActiveBlocks(data.blockId, false) + if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error') + markOutgoingEdges(data.blockId, { error: data.error }) + + executedBlockIds.add(data.blockId) + accumulatedBlockStates.set(data.blockId, { + output: { error: data.error }, + executed: true, + executionTime: data.durationMs || 0, + }) + + if (isContainerBlockType(data.blockType)) { + const originalId = stripCloneSuffixes(data.blockId) + if (originalId !== data.blockId) { + executedBlockIds.add(originalId) + if (workflowId) setBlockRunStatus(workflowId, originalId, 'error') + } + } + + accumulatedBlockLogs.push( + createBlockLogEntry(data, { success: false, output: {}, error: data.error }) + ) + + if (consoleMode === 'update') { + updateConsoleErrorEntry(data) + } else { + addConsoleErrorEntry(data) + } + } + + const onBlockChildWorkflowStarted = (data: { + blockId: string + childWorkflowInstanceId: string + iterationCurrent?: number + iterationContainerId?: string + executionOrder?: number + }) => { + if (isStaleExecution()) return + updateConsole( + data.blockId, + { + childWorkflowInstanceId: data.childWorkflowInstanceId, + ...(data.iterationCurrent !== undefined && { iterationCurrent: data.iterationCurrent }), + ...(data.iterationContainerId !== undefined && { + iterationContainerId: data.iterationContainerId, + }), + ...(data.executionOrder !== undefined && { executionOrder: data.executionOrder }), + }, + executionIdRef.current + ) + } + + return { onBlockStarted, onBlockCompleted, onBlockError, onBlockChildWorkflowStarted } +} + export interface WorkflowExecutionOptions { workflowId?: string workflowInput?: any @@ -115,7 +436,7 @@ export async function executeWorkflowWithFullLogging( } const executionId = options.executionId || uuidv4() - const { addConsole } = useTerminalConsoleStore.getState() + const { addConsole, updateConsole } = useTerminalConsoleStore.getState() const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus, setCurrentExecutionId } = useExecutionStore.getState() const wfId = targetWorkflowId @@ -123,6 +444,24 @@ export async function executeWorkflowWithFullLogging( const activeBlocksSet = new Set() const activeBlockRefCounts = new Map() + const executionIdRef = { current: executionId } + + const blockHandlers = createBlockEventHandlers( + { + workflowId: wfId, + executionIdRef, + workflowEdges, + activeBlocksSet, + activeBlockRefCounts, + accumulatedBlockLogs: [], + accumulatedBlockStates: new Map(), + executedBlockIds: new Set(), + consoleMode: 'update', + includeStartConsoleEntry: true, + onBlockCompleteCallback: options.onBlockComplete, + }, + { addConsole, updateConsole, setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } + ) const payload: any = { input: options.workflowInput, @@ -188,125 +527,25 @@ export async function executeWorkflowWithFullLogging( switch (event.type) { case 'execution:started': { setCurrentExecutionId(wfId, event.executionId) + executionIdRef.current = event.executionId || executionId break } - case 'block:started': { - updateActiveBlockRefCount( - activeBlockRefCounts, - activeBlocksSet, - event.data.blockId, - true - ) - setActiveBlocks(wfId, new Set(activeBlocksSet)) + + case 'block:started': + blockHandlers.onBlockStarted(event.data) break - } - case 'block:completed': { - updateActiveBlockRefCount( - activeBlockRefCounts, - activeBlocksSet, - event.data.blockId, - false - ) - setActiveBlocks(wfId, new Set(activeBlocksSet)) - - setBlockRunStatus(wfId, event.data.blockId, 'success') - markOutgoingEdgesFromOutput( - event.data.blockId, - event.data.output, - workflowEdges, - wfId, - setEdgeRunStatus - ) - - addConsole({ - input: event.data.input || {}, - output: event.data.output, - success: true, - durationMs: event.data.durationMs, - startedAt: new Date(Date.now() - event.data.durationMs).toISOString(), - executionOrder: event.data.executionOrder, - endedAt: new Date().toISOString(), - workflowId: targetWorkflowId, - blockId: event.data.blockId, - executionId, - blockName: event.data.blockName, - blockType: event.data.blockType, - iterationCurrent: event.data.iterationCurrent, - iterationTotal: event.data.iterationTotal, - iterationType: event.data.iterationType, - iterationContainerId: event.data.iterationContainerId, - childWorkflowBlockId: event.data.childWorkflowBlockId, - childWorkflowName: event.data.childWorkflowName, - childWorkflowInstanceId: event.data.childWorkflowInstanceId, - }) - - if (options.onBlockComplete) { - options.onBlockComplete(event.data.blockId, event.data.output).catch(() => {}) - } + case 'block:completed': + blockHandlers.onBlockCompleted(event.data) break - } - case 'block:error': { - updateActiveBlockRefCount( - activeBlockRefCounts, - activeBlocksSet, - event.data.blockId, - false - ) - setActiveBlocks(wfId, new Set(activeBlocksSet)) - - setBlockRunStatus(wfId, event.data.blockId, 'error') - markOutgoingEdgesFromOutput( - event.data.blockId, - { error: event.data.error }, - workflowEdges, - wfId, - setEdgeRunStatus - ) - - addConsole({ - input: event.data.input || {}, - output: {}, - success: false, - error: event.data.error, - durationMs: event.data.durationMs, - startedAt: new Date(Date.now() - event.data.durationMs).toISOString(), - executionOrder: event.data.executionOrder, - endedAt: new Date().toISOString(), - workflowId: targetWorkflowId, - blockId: event.data.blockId, - executionId, - blockName: event.data.blockName, - blockType: event.data.blockType, - iterationCurrent: event.data.iterationCurrent, - iterationTotal: event.data.iterationTotal, - iterationType: event.data.iterationType, - iterationContainerId: event.data.iterationContainerId, - childWorkflowBlockId: event.data.childWorkflowBlockId, - childWorkflowName: event.data.childWorkflowName, - childWorkflowInstanceId: event.data.childWorkflowInstanceId, - }) + case 'block:error': + blockHandlers.onBlockError(event.data) break - } - case 'block:childWorkflowStarted': { - const { updateConsole } = useTerminalConsoleStore.getState() - updateConsole( - event.data.blockId, - { - childWorkflowInstanceId: event.data.childWorkflowInstanceId, - ...(event.data.iterationCurrent !== undefined && { - iterationCurrent: event.data.iterationCurrent, - }), - ...(event.data.iterationContainerId !== undefined && { - iterationContainerId: event.data.iterationContainerId, - }), - }, - executionId - ) + case 'block:childWorkflowStarted': + blockHandlers.onBlockChildWorkflowStarted(event.data) break - } case 'execution:completed': setCurrentExecutionId(wfId, null)