Skip to content

Commit c3f5d77

Browse files
author
test
committed
fix(executor): await subflow diagnostics callbacks
Ensure empty-subflow and subflow-error lifecycle callbacks participate in progress-write draining before terminal finalization while still swallowing callback failures.
1 parent b4f0ced commit c3f5d77

File tree

5 files changed

+198
-54
lines changed

5 files changed

+198
-54
lines changed

apps/sim/executor/orchestrators/loop.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class LoopOrchestrator {
5151
private edgeManager: EdgeManager | null = null
5252
) {}
5353

54-
initializeLoopScope(ctx: ExecutionContext, loopId: string): LoopScope {
54+
async initializeLoopScope(ctx: ExecutionContext, loopId: string): Promise<LoopScope> {
5555
const loopConfig = this.dag.loopConfigs.get(loopId) as SerializedLoop | undefined
5656
if (!loopConfig) {
5757
throw new Error(`Loop config not found: ${loopId}`)
@@ -76,7 +76,7 @@ export class LoopOrchestrator {
7676
)
7777
if (iterationError) {
7878
logger.error(iterationError, { loopId, requestedIterations })
79-
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
79+
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
8080
iterations: requestedIterations,
8181
})
8282
scope.maxIterations = 0
@@ -99,7 +99,7 @@ export class LoopOrchestrator {
9999
} catch (error) {
100100
const errorMessage = `ForEach loop resolution failed: ${error instanceof Error ? error.message : String(error)}`
101101
logger.error(errorMessage, { loopId, forEachItems: loopConfig.forEachItems })
102-
this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
102+
await this.addLoopErrorLog(ctx, loopId, loopType, errorMessage, {
103103
forEachItems: loopConfig.forEachItems,
104104
})
105105
scope.items = []
@@ -117,7 +117,7 @@ export class LoopOrchestrator {
117117
)
118118
if (sizeError) {
119119
logger.error(sizeError, { loopId, collectionSize: items.length })
120-
this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
120+
await this.addLoopErrorLog(ctx, loopId, loopType, sizeError, {
121121
forEachItems: loopConfig.forEachItems,
122122
collectionSize: items.length,
123123
})
@@ -155,7 +155,7 @@ export class LoopOrchestrator {
155155
)
156156
if (iterationError) {
157157
logger.error(iterationError, { loopId, requestedIterations })
158-
this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
158+
await this.addLoopErrorLog(ctx, loopId, loopType, iterationError, {
159159
iterations: requestedIterations,
160160
})
161161
scope.maxIterations = 0
@@ -182,14 +182,14 @@ export class LoopOrchestrator {
182182
return scope
183183
}
184184

185-
private addLoopErrorLog(
185+
private async addLoopErrorLog(
186186
ctx: ExecutionContext,
187187
loopId: string,
188188
loopType: string,
189189
errorMessage: string,
190190
inputData?: any
191-
): void {
192-
addSubflowErrorLog(
191+
): Promise<void> {
192+
await addSubflowErrorLog(
193193
ctx,
194194
loopId,
195195
'loop',
@@ -604,7 +604,7 @@ export class LoopOrchestrator {
604604
if (!scope.items || scope.items.length === 0) {
605605
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
606606
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
607-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
607+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
608608
return false
609609
}
610610
return true
@@ -614,7 +614,7 @@ export class LoopOrchestrator {
614614
if (scope.maxIterations === 0) {
615615
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
616616
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
617-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
617+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
618618
return false
619619
}
620620
return true
@@ -628,7 +628,7 @@ export class LoopOrchestrator {
628628
if (!scope.condition) {
629629
logger.warn('No condition defined for while loop', { loopId })
630630
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
631-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
631+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
632632
return false
633633
}
634634

@@ -641,7 +641,7 @@ export class LoopOrchestrator {
641641

642642
if (!result) {
643643
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
644-
emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
644+
await emitEmptySubflowEvents(ctx, loopId, 'loop', this.contextExtensions)
645645
}
646646

647647
return result

apps/sim/executor/orchestrators/node.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ export class NodeExecutionOrchestrator {
5353

5454
const loopId = node.metadata.loopId
5555
if (loopId && !this.loopOrchestrator.getLoopScope(ctx, loopId)) {
56-
this.loopOrchestrator.initializeLoopScope(ctx, loopId)
56+
await this.loopOrchestrator.initializeLoopScope(ctx, loopId)
5757
}
5858

5959
const parallelId = node.metadata.parallelId
6060
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
6161
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
6262
const nodesInParallel = parallelConfig?.nodes?.length || 1
63-
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
63+
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
6464
}
6565

6666
if (node.metadata.isSentinel) {
@@ -158,7 +158,7 @@ export class NodeExecutionOrchestrator {
158158
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
159159
if (parallelConfig) {
160160
const nodesInParallel = parallelConfig.nodes?.length || 1
161-
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
161+
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
162162
}
163163
}
164164

@@ -239,7 +239,7 @@ export class NodeExecutionOrchestrator {
239239
if (!scope) {
240240
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
241241
const nodesInParallel = parallelConfig?.nodes?.length || 1
242-
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
242+
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
243243
}
244244
const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion(
245245
ctx,
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
import { beforeEach, describe, expect, it, vi } from 'vitest'
5+
import type { DAG } from '@/executor/dag/builder'
6+
import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types'
7+
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
8+
import type { ExecutionContext } from '@/executor/types'
9+
10+
vi.mock('@sim/logger', () => ({
11+
createLogger: () => ({
12+
info: vi.fn(),
13+
warn: vi.fn(),
14+
error: vi.fn(),
15+
debug: vi.fn(),
16+
}),
17+
}))
18+
19+
function createDag(): DAG {
20+
return {
21+
nodes: new Map(),
22+
loopConfigs: new Map(),
23+
parallelConfigs: new Map([
24+
[
25+
'parallel-1',
26+
{
27+
id: 'parallel-1',
28+
nodes: ['task-1'],
29+
distribution: [],
30+
parallelType: 'collection',
31+
},
32+
],
33+
]),
34+
}
35+
}
36+
37+
function createState(): BlockStateWriter {
38+
return {
39+
setBlockOutput: vi.fn(),
40+
setBlockState: vi.fn(),
41+
deleteBlockState: vi.fn(),
42+
unmarkExecuted: vi.fn(),
43+
}
44+
}
45+
46+
function createContext(overrides: Partial<ExecutionContext> = {}): ExecutionContext {
47+
return {
48+
workflowId: 'workflow-1',
49+
workspaceId: 'workspace-1',
50+
executionId: 'execution-1',
51+
userId: 'user-1',
52+
blockStates: new Map(),
53+
executedBlocks: new Set(),
54+
blockLogs: [],
55+
metadata: { duration: 0 },
56+
environmentVariables: {},
57+
decisions: {
58+
router: new Map(),
59+
condition: new Map(),
60+
},
61+
completedLoops: new Set(),
62+
activeExecutionPath: new Set(),
63+
workflow: {
64+
version: '1',
65+
blocks: [
66+
{
67+
id: 'parallel-1',
68+
position: { x: 0, y: 0 },
69+
config: { tool: '', params: {} },
70+
inputs: {},
71+
outputs: {},
72+
metadata: { id: 'parallel', name: 'Parallel 1' },
73+
enabled: true,
74+
},
75+
],
76+
connections: [],
77+
loops: {},
78+
parallels: {},
79+
},
80+
...overrides,
81+
}
82+
}
83+
84+
describe('ParallelOrchestrator', () => {
85+
beforeEach(() => {
86+
vi.clearAllMocks()
87+
})
88+
89+
it('awaits empty-subflow lifecycle callbacks before returning the empty scope', async () => {
90+
let releaseStart: (() => void) | undefined
91+
const onBlockStart = vi.fn(
92+
() =>
93+
new Promise<void>((resolve) => {
94+
releaseStart = resolve
95+
})
96+
)
97+
const onBlockComplete = vi.fn()
98+
const contextExtensions: ContextExtensions = {
99+
onBlockStart,
100+
onBlockComplete,
101+
}
102+
const orchestrator = new ParallelOrchestrator(
103+
createDag(),
104+
createState(),
105+
null,
106+
contextExtensions
107+
)
108+
const ctx = createContext()
109+
110+
const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1', 1)
111+
await Promise.resolve()
112+
113+
expect(onBlockStart).toHaveBeenCalledTimes(1)
114+
expect(onBlockComplete).not.toHaveBeenCalled()
115+
116+
releaseStart?.()
117+
const scope = await initializePromise
118+
119+
expect(onBlockComplete).toHaveBeenCalledTimes(1)
120+
expect(scope.isEmpty).toBe(true)
121+
})
122+
123+
it('swallows helper callback failures on empty parallel paths', async () => {
124+
const contextExtensions: ContextExtensions = {
125+
onBlockStart: vi.fn().mockRejectedValue(new Error('start failed')),
126+
onBlockComplete: vi.fn().mockRejectedValue(new Error('complete failed')),
127+
}
128+
const orchestrator = new ParallelOrchestrator(
129+
createDag(),
130+
createState(),
131+
null,
132+
contextExtensions
133+
)
134+
135+
await expect(
136+
orchestrator.initializeParallelScope(createContext(), 'parallel-1', 1)
137+
).resolves.toMatchObject({
138+
parallelId: 'parallel-1',
139+
isEmpty: true,
140+
})
141+
})
142+
})

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ export class ParallelOrchestrator {
4747
private contextExtensions: ContextExtensions | null = null
4848
) {}
4949

50-
initializeParallelScope(
50+
async initializeParallelScope(
5151
ctx: ExecutionContext,
5252
parallelId: string,
5353
terminalNodesCount = 1
54-
): ParallelScope {
54+
): Promise<ParallelScope> {
5555
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
5656
if (!parallelConfig) {
5757
throw new Error(`Parallel config not found: ${parallelId}`)
@@ -69,7 +69,7 @@ export class ParallelOrchestrator {
6969
} catch (error) {
7070
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
7171
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
72-
this.addParallelErrorLog(ctx, parallelId, errorMessage, {
72+
await this.addParallelErrorLog(ctx, parallelId, errorMessage, {
7373
distribution: parallelConfig.distribution,
7474
})
7575
this.setErrorScope(ctx, parallelId, errorMessage)
@@ -83,7 +83,7 @@ export class ParallelOrchestrator {
8383
)
8484
if (branchError) {
8585
logger.error(branchError, { parallelId, branchCount })
86-
this.addParallelErrorLog(ctx, parallelId, branchError, {
86+
await this.addParallelErrorLog(ctx, parallelId, branchError, {
8787
distribution: parallelConfig.distribution,
8888
branchCount,
8989
})
@@ -109,7 +109,7 @@ export class ParallelOrchestrator {
109109

110110
this.state.setBlockOutput(parallelId, { results: [] })
111111

112-
emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions)
112+
await emitEmptySubflowEvents(ctx, parallelId, 'parallel', this.contextExtensions)
113113

114114
logger.info('Parallel scope initialized with empty distribution, skipping body', {
115115
parallelId,
@@ -220,13 +220,13 @@ export class ParallelOrchestrator {
220220
return { branchCount: items.length, items }
221221
}
222222

223-
private addParallelErrorLog(
223+
private async addParallelErrorLog(
224224
ctx: ExecutionContext,
225225
parallelId: string,
226226
errorMessage: string,
227227
inputData?: any
228-
): void {
229-
addSubflowErrorLog(
228+
): Promise<void> {
229+
await addSubflowErrorLog(
230230
ctx,
231231
parallelId,
232232
'parallel',

0 commit comments

Comments
 (0)