Skip to content

Commit 2901db4

Browse files
author
test
committed
fix(executor): preserve durable diagnostics ordering
Await only the persistence needed to keep diagnostics durable before terminal completion while keeping callback failures from changing execution behavior.
1 parent 1e2fbab commit 2901db4

File tree

7 files changed

+440
-138
lines changed

7 files changed

+440
-138
lines changed

apps/sim/executor/execution/block-executor.ts

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ export class BlockExecutor {
7777
if (!isSentinel) {
7878
blockLog = this.createBlockLog(ctx, node.id, block, node)
7979
ctx.blockLogs.push(blockLog)
80-
this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
80+
await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
8181
}
8282

8383
const startTime = performance.now()
@@ -179,7 +179,7 @@ export class BlockExecutor {
179179
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
180180
block,
181181
})
182-
this.callOnBlockComplete(
182+
await this.callOnBlockComplete(
183183
ctx,
184184
node,
185185
block,
@@ -195,7 +195,7 @@ export class BlockExecutor {
195195

196196
return normalizedOutput
197197
} catch (error) {
198-
return this.handleBlockError(
198+
return await this.handleBlockError(
199199
error,
200200
ctx,
201201
node,
@@ -226,7 +226,7 @@ export class BlockExecutor {
226226
return this.blockHandlers.find((h) => h.canHandle(block))
227227
}
228228

229-
private handleBlockError(
229+
private async handleBlockError(
230230
error: unknown,
231231
ctx: ExecutionContext,
232232
node: DAGNode,
@@ -236,7 +236,7 @@ export class BlockExecutor {
236236
resolvedInputs: Record<string, any>,
237237
isSentinel: boolean,
238238
phase: 'input_resolution' | 'execution'
239-
): NormalizedBlockOutput {
239+
): Promise<NormalizedBlockOutput> {
240240
const duration = performance.now() - startTime
241241
const errorMessage = normalizeError(error)
242242
const hasResolvedInputs =
@@ -287,7 +287,7 @@ export class BlockExecutor {
287287
? error.childWorkflowInstanceId
288288
: undefined
289289
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
290-
this.callOnBlockComplete(
290+
await this.callOnBlockComplete(
291291
ctx,
292292
node,
293293
block,
@@ -439,31 +439,39 @@ export class BlockExecutor {
439439
return redactApiKeys(result)
440440
}
441441

442-
private callOnBlockStart(
442+
private async callOnBlockStart(
443443
ctx: ExecutionContext,
444444
node: DAGNode,
445445
block: SerializedBlock,
446446
executionOrder: number
447-
): void {
447+
): Promise<void> {
448448
const blockId = node.metadata?.originalBlockId ?? node.id
449449
const blockName = block.metadata?.name ?? blockId
450450
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
451451

452452
const iterationContext = getIterationContext(ctx, node?.metadata)
453453

454454
if (this.contextExtensions.onBlockStart) {
455-
this.contextExtensions.onBlockStart(
456-
blockId,
457-
blockName,
458-
blockType,
459-
executionOrder,
460-
iterationContext,
461-
ctx.childWorkflowContext
462-
)
455+
try {
456+
await this.contextExtensions.onBlockStart(
457+
blockId,
458+
blockName,
459+
blockType,
460+
executionOrder,
461+
iterationContext,
462+
ctx.childWorkflowContext
463+
)
464+
} catch (error) {
465+
logger.warn('Block start callback failed', {
466+
blockId,
467+
blockType,
468+
error: error instanceof Error ? error.message : String(error),
469+
})
470+
}
463471
}
464472
}
465473

466-
private callOnBlockComplete(
474+
private async callOnBlockComplete(
467475
ctx: ExecutionContext,
468476
node: DAGNode,
469477
block: SerializedBlock,
@@ -474,30 +482,38 @@ export class BlockExecutor {
474482
executionOrder: number,
475483
endedAt: string,
476484
childWorkflowInstanceId?: string
477-
): void {
485+
): Promise<void> {
478486
const blockId = node.metadata?.originalBlockId ?? node.id
479487
const blockName = block.metadata?.name ?? blockId
480488
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE
481489

482490
const iterationContext = getIterationContext(ctx, node?.metadata)
483491

484492
if (this.contextExtensions.onBlockComplete) {
485-
this.contextExtensions.onBlockComplete(
486-
blockId,
487-
blockName,
488-
blockType,
489-
{
490-
input,
491-
output,
492-
executionTime: duration,
493-
startedAt,
494-
executionOrder,
495-
endedAt,
496-
childWorkflowInstanceId,
497-
},
498-
iterationContext,
499-
ctx.childWorkflowContext
500-
)
493+
try {
494+
await this.contextExtensions.onBlockComplete(
495+
blockId,
496+
blockName,
497+
blockType,
498+
{
499+
input,
500+
output,
501+
executionTime: duration,
502+
startedAt,
503+
executionOrder,
504+
endedAt,
505+
childWorkflowInstanceId,
506+
},
507+
iterationContext,
508+
ctx.childWorkflowContext
509+
)
510+
} catch (error) {
511+
logger.warn('Block completion callback failed', {
512+
blockId,
513+
blockType,
514+
error: error instanceof Error ? error.message : String(error),
515+
})
516+
}
501517
}
502518
}
503519

apps/sim/executor/orchestrators/loop.ts

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ export class LoopOrchestrator {
238238
}
239239
if (isCancelled) {
240240
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
241-
return this.createExitResult(ctx, loopId, scope)
241+
return await this.createExitResult(ctx, loopId, scope)
242242
}
243243

244244
const iterationResults: NormalizedBlockOutput[] = []
@@ -253,7 +253,7 @@ export class LoopOrchestrator {
253253
scope.currentIterationOutputs.clear()
254254

255255
if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) {
256-
return this.createExitResult(ctx, loopId, scope)
256+
return await this.createExitResult(ctx, loopId, scope)
257257
}
258258

259259
scope.iteration++
@@ -269,11 +269,11 @@ export class LoopOrchestrator {
269269
}
270270
}
271271

272-
private createExitResult(
272+
private async createExitResult(
273273
ctx: ExecutionContext,
274274
loopId: string,
275275
scope: LoopScope
276-
): LoopContinuationResult {
276+
): Promise<LoopContinuationResult> {
277277
const results = scope.allIterationOutputs
278278
const output = { results }
279279
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
@@ -282,19 +282,26 @@ export class LoopOrchestrator {
282282
const now = new Date().toISOString()
283283
const iterationContext = buildContainerIterationContext(ctx, loopId)
284284

285-
this.contextExtensions.onBlockComplete(
286-
loopId,
287-
'Loop',
288-
'loop',
289-
{
290-
output,
291-
executionTime: DEFAULTS.EXECUTION_TIME,
292-
startedAt: now,
293-
executionOrder: getNextExecutionOrder(ctx),
294-
endedAt: now,
295-
},
296-
iterationContext
297-
)
285+
try {
286+
await this.contextExtensions.onBlockComplete(
287+
loopId,
288+
'Loop',
289+
'loop',
290+
{
291+
output,
292+
executionTime: DEFAULTS.EXECUTION_TIME,
293+
startedAt: now,
294+
executionOrder: getNextExecutionOrder(ctx),
295+
endedAt: now,
296+
},
297+
iterationContext
298+
)
299+
} catch (error) {
300+
logger.warn('Loop completion callback failed', {
301+
loopId,
302+
error: error instanceof Error ? error.message : String(error),
303+
})
304+
}
298305
}
299306

300307
return {

apps/sim/executor/orchestrators/node.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ export class NodeExecutionOrchestrator {
9292
const isParallelSentinel = node.metadata.isParallelSentinel
9393

9494
if (isParallelSentinel) {
95-
return this.handleParallelSentinel(ctx, node, sentinelType, parallelId)
95+
return await this.handleParallelSentinel(ctx, node, sentinelType, parallelId)
9696
}
9797

9898
switch (sentinelType) {
@@ -142,12 +142,12 @@ export class NodeExecutionOrchestrator {
142142
}
143143
}
144144

145-
private handleParallelSentinel(
145+
private async handleParallelSentinel(
146146
ctx: ExecutionContext,
147147
node: DAGNode,
148148
sentinelType: string | undefined,
149149
parallelId: string | undefined
150-
): NormalizedBlockOutput {
150+
): Promise<NormalizedBlockOutput> {
151151
if (!parallelId) {
152152
logger.warn('Parallel sentinel called without parallelId')
153153
return {}
@@ -176,7 +176,7 @@ export class NodeExecutionOrchestrator {
176176
}
177177

178178
if (sentinelType === 'end') {
179-
const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
179+
const result = await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
180180
return {
181181
results: result.results || [],
182182
sentinelEnd: true,
@@ -210,7 +210,7 @@ export class NodeExecutionOrchestrator {
210210
} else if (isParallelBranch) {
211211
const parallelId = this.findParallelIdForNode(node.id)
212212
if (parallelId) {
213-
this.handleParallelNodeCompletion(ctx, node, output, parallelId)
213+
await this.handleParallelNodeCompletion(ctx, node, output, parallelId)
214214
} else {
215215
this.handleRegularNodeCompletion(ctx, node, output)
216216
}
@@ -229,12 +229,12 @@ export class NodeExecutionOrchestrator {
229229
this.state.setBlockOutput(node.id, output)
230230
}
231231

232-
private handleParallelNodeCompletion(
232+
private async handleParallelNodeCompletion(
233233
ctx: ExecutionContext,
234234
node: DAGNode,
235235
output: NormalizedBlockOutput,
236236
parallelId: string
237-
): void {
237+
): Promise<void> {
238238
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
239239
if (!scope) {
240240
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
@@ -248,7 +248,7 @@ export class NodeExecutionOrchestrator {
248248
output
249249
)
250250
if (allComplete) {
251-
this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
251+
await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
252252
}
253253

254254
this.state.setBlockOutput(node.id, output)

apps/sim/executor/orchestrators/parallel.ts

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ export class ParallelOrchestrator {
291291
return allComplete
292292
}
293293

294-
aggregateParallelResults(ctx: ExecutionContext, parallelId: string): ParallelAggregationResult {
294+
async aggregateParallelResults(
295+
ctx: ExecutionContext,
296+
parallelId: string
297+
): Promise<ParallelAggregationResult> {
295298
const scope = ctx.parallelExecutions?.get(parallelId)
296299
if (!scope) {
297300
logger.error('Parallel scope not found for aggregation', { parallelId })
@@ -316,19 +319,26 @@ export class ParallelOrchestrator {
316319
const now = new Date().toISOString()
317320
const iterationContext = buildContainerIterationContext(ctx, parallelId)
318321

319-
this.contextExtensions.onBlockComplete(
320-
parallelId,
321-
'Parallel',
322-
'parallel',
323-
{
324-
output,
325-
executionTime: 0,
326-
startedAt: now,
327-
executionOrder: getNextExecutionOrder(ctx),
328-
endedAt: now,
329-
},
330-
iterationContext
331-
)
322+
try {
323+
await this.contextExtensions.onBlockComplete(
324+
parallelId,
325+
'Parallel',
326+
'parallel',
327+
{
328+
output,
329+
executionTime: 0,
330+
startedAt: now,
331+
executionOrder: getNextExecutionOrder(ctx),
332+
endedAt: now,
333+
},
334+
iterationContext
335+
)
336+
} catch (error) {
337+
logger.warn('Parallel completion callback failed', {
338+
parallelId,
339+
error: error instanceof Error ? error.message : String(error),
340+
})
341+
}
332342
}
333343

334344
return {

0 commit comments

Comments
 (0)