Skip to content

Commit 0ba8c88

Browse files
waleedlatif1claude
andcommitted
improvement(executor): clean up nested subflow implementation
- Fix wireSentinelEdges to use LOOP_EXIT handle for nested loop terminals - Extract buildExecutionPipeline to deduplicate orchestrator wiring - Replace two-phase init with constructor injection for Loop/ParallelOrchestrator - Remove dead code: shouldExecuteLoopNode, resolveForEachItems, isLoopNode, isParallelNode, isSubflowBlockType - Deduplicate currentItem resolution in ParallelResolver via resolveCurrentItem - Type getDistributionItems param as SerializedParallel instead of any - Demote verbose per-reference logger.info to logger.debug in evaluateWhileCondition - Add loop-in-parallel wiring test in edges.test.ts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bb96a6e commit 0ba8c88

File tree

9 files changed

+183
-150
lines changed

9 files changed

+183
-150
lines changed

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/terminal/utils.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,6 @@ export function isEventFromEditableElement(e: KeyboardEvent): boolean {
125125
return false
126126
}
127127

128-
/**
129-
* Checks if a block type is a subflow (loop or parallel)
130-
*/
131-
export function isSubflowBlockType(blockType: string): boolean {
132-
const lower = blockType?.toLowerCase() || ''
133-
return lower === 'loop' || lower === 'parallel'
134-
}
135-
136128
/**
137129
* Node type for the tree structure
138130
*/

apps/sim/executor/dag/construction/edges.test.ts

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,5 +1574,123 @@ describe('EdgeConstructor', () => {
15741574
Array.from(funcNode.outgoingEdges.values()).some((e) => e.target === parallelSentinelEnd)
15751575
).toBe(true)
15761576
})
1577+
1578+
it('should wire loop-in-parallel with correct exit handles', () => {
1579+
const outerParallelId = 'outer-parallel'
1580+
const innerLoopId = 'inner-loop'
1581+
const functionId = 'func-1'
1582+
1583+
const outerSentinelStart = `parallel-${outerParallelId}-sentinel-start`
1584+
const outerSentinelEnd = `parallel-${outerParallelId}-sentinel-end`
1585+
const innerSentinelStart = `loop-${innerLoopId}-sentinel-start`
1586+
const innerSentinelEnd = `loop-${innerLoopId}-sentinel-end`
1587+
1588+
const dag = createMockDAG([
1589+
outerSentinelStart,
1590+
outerSentinelEnd,
1591+
innerSentinelStart,
1592+
innerSentinelEnd,
1593+
functionId,
1594+
])
1595+
1596+
dag.nodes.get(outerSentinelStart)!.metadata = {
1597+
isSentinel: true,
1598+
isParallelSentinel: true,
1599+
sentinelType: 'start',
1600+
parallelId: outerParallelId,
1601+
}
1602+
dag.nodes.get(outerSentinelEnd)!.metadata = {
1603+
isSentinel: true,
1604+
isParallelSentinel: true,
1605+
sentinelType: 'end',
1606+
parallelId: outerParallelId,
1607+
}
1608+
dag.nodes.get(innerSentinelStart)!.metadata = {
1609+
isSentinel: true,
1610+
sentinelType: 'start',
1611+
loopId: innerLoopId,
1612+
}
1613+
dag.nodes.get(innerSentinelEnd)!.metadata = {
1614+
isSentinel: true,
1615+
sentinelType: 'end',
1616+
loopId: innerLoopId,
1617+
}
1618+
1619+
const innerLoop: SerializedLoop = {
1620+
id: innerLoopId,
1621+
nodes: [functionId],
1622+
iterations: 3,
1623+
loopType: 'for',
1624+
}
1625+
1626+
dag.loopConfigs.set(innerLoopId, innerLoop)
1627+
dag.parallelConfigs.set(outerParallelId, {
1628+
id: outerParallelId,
1629+
nodes: [innerLoopId],
1630+
count: 2,
1631+
parallelType: 'count',
1632+
})
1633+
1634+
const workflow = createMockWorkflow(
1635+
[createMockBlock(functionId), createMockBlock(innerLoopId, 'loop')],
1636+
[
1637+
{
1638+
source: outerParallelId,
1639+
target: innerLoopId,
1640+
sourceHandle: 'parallel-start-source',
1641+
},
1642+
{
1643+
source: innerLoopId,
1644+
target: functionId,
1645+
sourceHandle: 'loop-start-source',
1646+
},
1647+
],
1648+
{ [innerLoopId]: innerLoop }
1649+
)
1650+
1651+
const edgeConstructor = new EdgeConstructor()
1652+
edgeConstructor.execute(
1653+
workflow,
1654+
dag,
1655+
new Set([innerLoopId]),
1656+
new Set([functionId]),
1657+
new Set([outerParallelId, innerLoopId, functionId]),
1658+
new Map()
1659+
)
1660+
1661+
// Outer sentinel-start → inner loop sentinel-start
1662+
const outerStartNode = dag.nodes.get(outerSentinelStart)!
1663+
const edgeToInnerStart = Array.from(outerStartNode.outgoingEdges.values()).find(
1664+
(e) => e.target === innerSentinelStart
1665+
)
1666+
expect(edgeToInnerStart).toBeDefined()
1667+
1668+
// Inner loop sentinel-end → outer parallel sentinel-end with loop_exit handle
1669+
const innerEndNode = dag.nodes.get(innerSentinelEnd)!
1670+
const edgeToOuterEnd = Array.from(innerEndNode.outgoingEdges.values()).find(
1671+
(e) => e.target === outerSentinelEnd
1672+
)
1673+
expect(edgeToOuterEnd).toBeDefined()
1674+
expect(edgeToOuterEnd!.sourceHandle).toBe('loop_exit')
1675+
1676+
// Inner loop back-edge: sentinel-end → sentinel-start with loop_continue handle
1677+
const backEdge = Array.from(innerEndNode.outgoingEdges.values()).find(
1678+
(e) => e.target === innerSentinelStart
1679+
)
1680+
expect(backEdge).toBeDefined()
1681+
expect(backEdge!.sourceHandle).toBe('loop_continue')
1682+
1683+
// Inner loop wiring: sentinel-start → function
1684+
const innerStartNode = dag.nodes.get(innerSentinelStart)!
1685+
expect(
1686+
Array.from(innerStartNode.outgoingEdges.values()).some((e) => e.target === functionId)
1687+
).toBe(true)
1688+
1689+
// Function → inner loop sentinel-end
1690+
const funcNode = dag.nodes.get(functionId)!
1691+
expect(
1692+
Array.from(funcNode.outgoingEdges.values()).some((e) => e.target === innerSentinelEnd)
1693+
).toBe(true)
1694+
})
15771695
})
15781696
})

apps/sim/executor/execution/executor.ts

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,24 +71,7 @@ export class DAGExecutor {
7171
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
7272
context.subflowParentMap = this.buildSubflowParentMap(dag)
7373

74-
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
75-
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
76-
loopOrchestrator.setContextExtensions(this.contextExtensions)
77-
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
78-
parallelOrchestrator.setResolver(resolver)
79-
parallelOrchestrator.setContextExtensions(this.contextExtensions)
80-
const allHandlers = createBlockHandlers()
81-
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
82-
const edgeManager = new EdgeManager(dag)
83-
loopOrchestrator.setEdgeManager(edgeManager)
84-
const nodeOrchestrator = new NodeExecutionOrchestrator(
85-
dag,
86-
state,
87-
blockExecutor,
88-
loopOrchestrator,
89-
parallelOrchestrator
90-
)
91-
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
74+
const engine = this.buildExecutionPipeline(context, dag, state)
9275
return await engine.run(triggerBlockId)
9376
}
9477

@@ -213,26 +196,36 @@ export class DAGExecutor {
213196
})
214197
context.subflowParentMap = this.buildSubflowParentMap(dag)
215198

199+
const engine = this.buildExecutionPipeline(context, dag, state)
200+
return await engine.run()
201+
}
202+
203+
private buildExecutionPipeline(context: ExecutionContext, dag: DAG, state: ExecutionState) {
216204
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
217-
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
218-
loopOrchestrator.setContextExtensions(this.contextExtensions)
219-
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
220-
parallelOrchestrator.setResolver(resolver)
221-
parallelOrchestrator.setContextExtensions(this.contextExtensions)
222205
const allHandlers = createBlockHandlers()
223206
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
224207
const edgeManager = new EdgeManager(dag)
225-
loopOrchestrator.setEdgeManager(edgeManager)
208+
const loopOrchestrator = new LoopOrchestrator(
209+
dag,
210+
state,
211+
resolver,
212+
this.contextExtensions,
213+
edgeManager
214+
)
215+
const parallelOrchestrator = new ParallelOrchestrator(
216+
dag,
217+
state,
218+
resolver,
219+
this.contextExtensions
220+
)
226221
const nodeOrchestrator = new NodeExecutionOrchestrator(
227222
dag,
228223
state,
229224
blockExecutor,
230225
loopOrchestrator,
231226
parallelOrchestrator
232227
)
233-
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
234-
235-
return await engine.run()
228+
return new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
236229
}
237230

238231
private createExecutionContext(

apps/sim/executor/orchestrators/loop.ts

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,14 @@ export interface LoopContinuationResult {
4242
}
4343

4444
export class LoopOrchestrator {
45-
private edgeManager: EdgeManager | null = null
46-
private contextExtensions: ContextExtensions | null = null
47-
4845
constructor(
4946
private dag: DAG,
5047
private state: BlockStateController,
51-
private resolver: VariableResolver
48+
private resolver: VariableResolver,
49+
private contextExtensions: ContextExtensions | null = null,
50+
private edgeManager: EdgeManager | null = null
5251
) {}
5352

54-
setContextExtensions(contextExtensions: ContextExtensions): void {
55-
this.contextExtensions = contextExtensions
56-
}
57-
58-
setEdgeManager(edgeManager: EdgeManager): void {
59-
this.edgeManager = edgeManager
60-
}
61-
6253
initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
6354
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
6455
if (!loopConfig) {
@@ -103,7 +94,7 @@ export class LoopOrchestrator {
10394
scope.loopType = 'forEach'
10495
let items: any[]
10596
try {
106-
items = this.resolveForEachItems(ctx, loopConfig.forEachItems)
97+
items = resolveArrayInput(ctx, loopConfig.forEachItems, this.resolver)
10798
} catch (error) {
10899
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
109100
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
@@ -359,9 +350,8 @@ export class LoopOrchestrator {
359350
for (const nodeId of loopConfig.nodes) {
360351
if (this.dag.loopConfigs.has(nodeId)) {
361352
ctx.loopExecutions?.delete(nodeId)
362-
// Delete cloned loop variants (e.g., inner-loop__obranch-N)
363-
// Only delete clone entries from subflowParentMap, never the original
364-
// structural entries which are needed for SSE iteration context.
353+
// Delete cloned loop variants (e.g., inner-loop__obranch-N) but not original
354+
// subflowParentMap entries which are needed for SSE iteration context.
365355
if (ctx.loopExecutions) {
366356
const prefix = `${nodeId}__obranch-`
367357
for (const key of ctx.loopExecutions.keys()) {
@@ -400,11 +390,7 @@ export class LoopOrchestrator {
400390
*/
401391
private deleteParallelScopeAndClones(parallelId: string, ctx: ExecutionContext): void {
402392
ctx.parallelExecutions?.delete(parallelId)
403-
// Do NOT delete the original entry from subflowParentMap — it is a static
404-
// structural mapping needed for SSE iteration context on subsequent loop
405-
// iterations. Only delete __obranch-N clone entries below.
406-
407-
// Delete any cloned scopes (e.g., inner-parallel__obranch-1)
393+
// Delete cloned scopes (__obranch-N) but not original subflowParentMap entries
408394
if (ctx.parallelExecutions) {
409395
const prefix = `${parallelId}__obranch-`
410396
for (const key of ctx.parallelExecutions.keys()) {
@@ -415,7 +401,6 @@ export class LoopOrchestrator {
415401
}
416402
}
417403

418-
// Recurse into nested subflows within this parallel
419404
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
420405
if (parallelConfig?.nodes) {
421406
for (const nodeId of parallelConfig.nodes) {
@@ -483,17 +468,14 @@ export class LoopOrchestrator {
483468
for (const id of this.collectAllLoopNodeIds(nodeId, visited)) {
484469
result.add(id)
485470
}
486-
// Also collect cloned loop variants (e.g., loop-1__obranch-N)
487471
this.collectClonedSubflowNodes(nodeId, result, visited)
488472
} else if (this.dag.parallelConfigs.has(nodeId)) {
489473
for (const id of this.collectAllParallelNodeIds(nodeId, visited)) {
490474
result.add(id)
491475
}
492-
// Also collect cloned parallel variants
493476
this.collectClonedSubflowNodes(nodeId, result, visited)
494477
} else {
495478
result.add(nodeId)
496-
// Collect ALL dynamic branch nodes (not just branch 0)
497479
this.collectAllBranchNodes(nodeId, result)
498480
}
499481
}
@@ -605,23 +587,19 @@ export class LoopOrchestrator {
605587
return true
606588
}
607589

608-
// for: skip if maxIterations is 0
609590
if (scope.loopType === 'for') {
610591
if (scope.maxIterations === 0) {
611592
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
612-
// Set empty output for the loop
613593
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
614594
return false
615595
}
616596
return true
617597
}
618598

619-
// doWhile: always execute at least once
620599
if (scope.loopType === 'doWhile') {
621600
return true
622601
}
623602

624-
// while: check condition before first iteration
625603
if (scope.loopType === 'while') {
626604
if (!scope.condition) {
627605
logger.warn('No condition defined for while loop', { loopId })
@@ -641,10 +619,6 @@ export class LoopOrchestrator {
641619
return true
642620
}
643621

644-
shouldExecuteLoopNode(_ctx: ExecutionContext, _nodeId: string, _loopId: string): boolean {
645-
return true
646-
}
647-
648622
private async evaluateWhileCondition(
649623
ctx: ExecutionContext,
650624
condition: string,
@@ -663,10 +637,9 @@ export class LoopOrchestrator {
663637

664638
const evaluatedCondition = replaceValidReferences(condition, (match) => {
665639
const resolved = this.resolver.resolveSingleReference(ctx, '', match, scope)
666-
logger.info('Resolved variable reference in loop condition', {
640+
logger.debug('Resolved variable reference in loop condition', {
667641
reference: match,
668642
resolvedValue: resolved,
669-
resolvedType: typeof resolved,
670643
})
671644
if (resolved !== undefined) {
672645
if (typeof resolved === 'boolean' || typeof resolved === 'number') {
@@ -721,8 +694,4 @@ export class LoopOrchestrator {
721694
return false
722695
}
723696
}
724-
725-
private resolveForEachItems(ctx: ExecutionContext, items: any): any[] {
726-
return resolveArrayInput(ctx, items, this.resolver)
727-
}
728697
}

apps/sim/executor/orchestrators/node.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,6 @@ export class NodeExecutionOrchestrator {
5656
this.loopOrchestrator.initializeLoopScope(ctx, loopId)
5757
}
5858

59-
if (loopId && !this.loopOrchestrator.shouldExecuteLoopNode(ctx, nodeId, loopId)) {
60-
return {
61-
nodeId,
62-
output: {},
63-
isFinalOutput: false,
64-
}
65-
}
66-
6759
const parallelId = node.metadata.parallelId
6860
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
6961
const parallelConfig = this.dag.parallelConfigs.get(parallelId)

0 commit comments

Comments
 (0)