Skip to content

Commit d640fa0

Browse files
fix(condition): execution with subflow sentinels follow-on, snapshot highlighting, duplicate terminal logs (#3429)
* fix(condition): consecutive error logging + execution dequeuing * fix snapshot highlighting * address minor gaps * fix incomplete case * remove activatedEdges path * cleanup tests * address greptile comments * update tests:
1 parent 28f8e0f commit d640fa0

File tree

11 files changed

+688
-89
lines changed

11 files changed

+688
-89
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import {
2020
TriggerUtils,
2121
} from '@/lib/workflows/triggers/triggers'
2222
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
23-
import { updateActiveBlockRefCount } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
23+
import {
24+
markOutgoingEdgesFromOutput,
25+
updateActiveBlockRefCount,
26+
} from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils'
2427
import { getBlock } from '@/blocks'
2528
import type { SerializableExecutionState } from '@/executor/execution/types'
2629
import type {
@@ -63,7 +66,7 @@ interface DebugValidationResult {
6366
interface BlockEventHandlerConfig {
6467
workflowId?: string
6568
executionIdRef: { current: string }
66-
workflowEdges: Array<{ id: string; target: string; sourceHandle?: string | null }>
69+
workflowEdges: Array<{ id: string; source: string; target: string; sourceHandle?: string | null }>
6770
activeBlocksSet: Set<string>
6871
activeBlockRefCounts: Map<string, number>
6972
accumulatedBlockLogs: BlockLog[]
@@ -335,13 +338,9 @@ export function useWorkflowExecution() {
335338
setActiveBlocks(workflowId, new Set(activeBlocksSet))
336339
}
337340

338-
const markIncomingEdges = (blockId: string) => {
341+
const markOutgoingEdges = (blockId: string, output: Record<string, any> | undefined) => {
339342
if (!workflowId) return
340-
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
341-
incomingEdges.forEach((edge) => {
342-
const status = edge.sourceHandle === 'error' ? 'error' : 'success'
343-
setEdgeRunStatus(workflowId, edge.id, status)
344-
})
343+
markOutgoingEdgesFromOutput(blockId, output, workflowEdges, workflowId, setEdgeRunStatus)
345344
}
346345

347346
const isContainerBlockType = (blockType?: string) => {
@@ -460,7 +459,6 @@ export function useWorkflowExecution() {
460459
const onBlockStarted = (data: BlockStartedData) => {
461460
if (isStaleExecution()) return
462461
updateActiveBlocks(data.blockId, true)
463-
markIncomingEdges(data.blockId)
464462

465463
if (!includeStartConsoleEntry || !workflowId) return
466464

@@ -487,6 +485,7 @@ export function useWorkflowExecution() {
487485
if (isStaleExecution()) return
488486
updateActiveBlocks(data.blockId, false)
489487
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'success')
488+
markOutgoingEdges(data.blockId, data.output as Record<string, any> | undefined)
490489
executedBlockIds.add(data.blockId)
491490
accumulatedBlockStates.set(data.blockId, {
492491
output: data.output,
@@ -505,7 +504,9 @@ export function useWorkflowExecution() {
505504
}
506505

507506
if (isContainerBlockType(data.blockType) && !data.iterationContainerId) {
508-
return
507+
const output = data.output as Record<string, any> | undefined
508+
const isEmptySubflow = Array.isArray(output?.results) && output.results.length === 0
509+
if (!isEmptySubflow) return
509510
}
510511

511512
accumulatedBlockLogs.push(createBlockLogEntry(data, { success: true, output: data.output }))
@@ -527,6 +528,7 @@ export function useWorkflowExecution() {
527528
if (isStaleExecution()) return
528529
updateActiveBlocks(data.blockId, false)
529530
if (workflowId) setBlockRunStatus(workflowId, data.blockId, 'error')
531+
markOutgoingEdges(data.blockId, { error: data.error })
530532

531533
executedBlockIds.add(data.blockId)
532534
accumulatedBlockStates.set(data.blockId, {

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/utils/workflow-execution-utils.ts

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,69 @@ export function updateActiveBlockRefCount(
2929
}
3030
}
3131

32+
/**
33+
* Determines if a workflow edge should be marked as active based on its handle and the block output.
34+
* Mirrors the executor's EdgeManager.shouldActivateEdge logic on the client side.
35+
*/
36+
function shouldActivateEdgeClient(
37+
handle: string | null | undefined,
38+
output: Record<string, any> | undefined
39+
): boolean {
40+
if (!handle) return true
41+
42+
if (handle.startsWith('condition-')) {
43+
return output?.selectedOption === handle.substring('condition-'.length)
44+
}
45+
46+
if (handle.startsWith('router-')) {
47+
return output?.selectedRoute === handle.substring('router-'.length)
48+
}
49+
50+
switch (handle) {
51+
case 'error':
52+
return !!output?.error
53+
case 'source':
54+
return !output?.error
55+
case 'loop-start-source':
56+
case 'loop-end-source':
57+
case 'parallel-start-source':
58+
case 'parallel-end-source':
59+
case 'loop_exit':
60+
case 'loop_continue':
61+
case 'loop_continue_alt':
62+
case 'parallel_exit':
63+
return true
64+
default:
65+
return true
66+
}
67+
}
68+
69+
export function markOutgoingEdgesFromOutput(
70+
blockId: string,
71+
output: Record<string, any> | undefined,
72+
workflowEdges: Array<{
73+
id: string
74+
source: string
75+
target: string
76+
sourceHandle?: string | null
77+
}>,
78+
workflowId: string,
79+
setEdgeRunStatus: (wfId: string, edgeId: string, status: 'success' | 'error') => void
80+
): void {
81+
const outgoing = workflowEdges.filter((edge) => edge.source === blockId)
82+
for (const edge of outgoing) {
83+
const handle = edge.sourceHandle
84+
if (!handle) {
85+
setEdgeRunStatus(workflowId, edge.id, 'success')
86+
continue
87+
}
88+
if (shouldActivateEdgeClient(handle, output)) {
89+
const status = handle === 'error' ? 'error' : 'success'
90+
setEdgeRunStatus(workflowId, edge.id, status)
91+
}
92+
}
93+
}
94+
3295
export interface WorkflowExecutionOptions {
3396
workflowInput?: any
3497
onStream?: (se: StreamingExecution) => Promise<void>
@@ -135,13 +198,6 @@ export async function executeWorkflowWithFullLogging(
135198
true
136199
)
137200
setActiveBlocks(wfId, new Set(activeBlocksSet))
138-
139-
const incomingEdges = workflowEdges.filter(
140-
(edge) => edge.target === event.data.blockId
141-
)
142-
incomingEdges.forEach((edge) => {
143-
setEdgeRunStatus(wfId, edge.id, 'success')
144-
})
145201
break
146202
}
147203

@@ -155,6 +211,13 @@ export async function executeWorkflowWithFullLogging(
155211
setActiveBlocks(wfId, new Set(activeBlocksSet))
156212

157213
setBlockRunStatus(wfId, event.data.blockId, 'success')
214+
markOutgoingEdgesFromOutput(
215+
event.data.blockId,
216+
event.data.output,
217+
workflowEdges,
218+
wfId,
219+
setEdgeRunStatus
220+
)
158221

159222
addConsole({
160223
input: event.data.input || {},
@@ -194,6 +257,13 @@ export async function executeWorkflowWithFullLogging(
194257
setActiveBlocks(wfId, new Set(activeBlocksSet))
195258

196259
setBlockRunStatus(wfId, event.data.blockId, 'error')
260+
markOutgoingEdgesFromOutput(
261+
event.data.blockId,
262+
{ error: event.data.error },
263+
workflowEdges,
264+
wfId,
265+
setEdgeRunStatus
266+
)
197267

198268
addConsole({
199269
input: event.data.input || {},

apps/sim/app/workspace/[workspaceId]/w/components/preview/components/preview-workflow/preview-workflow.tsx

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ interface PreviewWorkflowProps {
145145
/** Cursor style to show when hovering the canvas */
146146
cursorStyle?: 'default' | 'pointer' | 'grab'
147147
/** Map of executed block IDs to their status for highlighting the execution path */
148-
executedBlocks?: Record<string, { status: string }>
148+
executedBlocks?: Record<string, { status: string; output?: unknown }>
149149
/** Currently selected block ID for highlighting */
150150
selectedBlockId?: string | null
151151
/** Skips expensive subblock computations for thumbnails/template previews */
@@ -274,9 +274,9 @@ export function PreviewWorkflow({
274274

275275
/** Maps base block IDs to execution data, handling parallel iteration variants (blockId₍n₎). */
276276
const blockExecutionMap = useMemo(() => {
277-
if (!executedBlocks) return new Map<string, { status: string }>()
277+
if (!executedBlocks) return new Map<string, { status: string; output?: unknown }>()
278278

279-
const map = new Map<string, { status: string }>()
279+
const map = new Map<string, { status: string; output?: unknown }>()
280280
for (const [key, value] of Object.entries(executedBlocks)) {
281281
// Extract base ID (remove iteration suffix like ₍0₎)
282282
const baseId = key.includes('₍') ? key.split('₍')[0] : key
@@ -451,7 +451,6 @@ export function PreviewWorkflow({
451451
const edges: Edge[] = useMemo(() => {
452452
if (!isValidWorkflowState) return []
453453

454-
/** Edge is green if target executed and source condition met by edge type. */
455454
const getEdgeExecutionStatus = (edge: {
456455
source: string
457456
target: string
@@ -463,17 +462,40 @@ export function PreviewWorkflow({
463462
if (!targetStatus?.executed) return 'not-executed'
464463

465464
const sourceStatus = getBlockExecutionStatus(edge.source)
466-
const { sourceHandle } = edge
465+
if (!sourceStatus?.executed) return 'not-executed'
467466

468-
if (sourceHandle === 'error') {
469-
return sourceStatus?.status === 'error' ? 'success' : 'not-executed'
467+
const handle = edge.sourceHandle
468+
if (!handle) {
469+
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
470470
}
471471

472-
if (sourceHandle === 'loop-start-source' || sourceHandle === 'parallel-start-source') {
473-
return 'success'
472+
const sourceOutput = blockExecutionMap.get(edge.source)?.output as
473+
| Record<string, any>
474+
| undefined
475+
476+
if (handle.startsWith('condition-')) {
477+
const conditionValue = handle.substring('condition-'.length)
478+
return sourceOutput?.selectedOption === conditionValue ? 'success' : 'not-executed'
479+
}
480+
481+
if (handle.startsWith('router-')) {
482+
const routeId = handle.substring('router-'.length)
483+
return sourceOutput?.selectedRoute === routeId ? 'success' : 'not-executed'
474484
}
475485

476-
return sourceStatus?.status === 'success' ? 'success' : 'not-executed'
486+
switch (handle) {
487+
case 'error':
488+
return sourceStatus.status === 'error' ? 'error' : 'not-executed'
489+
case 'source':
490+
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
491+
case 'loop-start-source':
492+
case 'loop-end-source':
493+
case 'parallel-start-source':
494+
case 'parallel-end-source':
495+
return 'success'
496+
default:
497+
return sourceStatus.status === 'success' ? 'success' : 'not-executed'
498+
}
477499
}
478500

479501
return (workflowState.edges || []).map((edge) => {

0 commit comments

Comments
 (0)