Skip to content

Commit 42cc70c

Browse files
fix(async): keep execution finalization state consistent
Retry minimal logging for early failures, only mark core finalization after a log row actually completes, and let paused completions fall back cleanly.
1 parent e314ae0 commit 42cc70c

File tree

4 files changed

+158
-10
lines changed

4 files changed

+158
-10
lines changed

apps/sim/lib/logs/execution/logging-session.test.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,49 @@ describe('LoggingSession completion retries', () => {
8989

9090
expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2)
9191
})
92+
93+
it('marks paused completions as completed and deduplicates later attempts', async () => {
94+
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1')
95+
96+
completeWorkflowExecutionMock.mockResolvedValue({})
97+
98+
await expect(
99+
session.safeCompleteWithPause({
100+
endedAt: new Date().toISOString(),
101+
totalDurationMs: 10,
102+
traceSpans: [],
103+
workflowInput: { hello: 'world' },
104+
})
105+
).resolves.toBeUndefined()
106+
107+
expect(session.hasCompleted()).toBe(true)
108+
109+
await expect(
110+
session.safeCompleteWithError({
111+
error: { message: 'should be ignored' },
112+
})
113+
).resolves.toBeUndefined()
114+
115+
expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(1)
116+
})
117+
118+
it('falls back to cost-only logging when paused completion fails', async () => {
119+
const session = new LoggingSession('workflow-1', 'execution-2', 'api', 'req-1')
120+
121+
completeWorkflowExecutionMock
122+
.mockRejectedValueOnce(new Error('pause finalize failed'))
123+
.mockResolvedValueOnce({})
124+
125+
await expect(
126+
session.safeCompleteWithPause({
127+
endedAt: new Date().toISOString(),
128+
totalDurationMs: 10,
129+
traceSpans: [],
130+
workflowInput: { hello: 'world' },
131+
})
132+
).resolves.toBeUndefined()
133+
134+
expect(session.hasCompleted()).toBe(true)
135+
expect(completeWorkflowExecutionMock).toHaveBeenCalledTimes(2)
136+
})
92137
})

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,11 @@ export class LoggingSession {
549549
}
550550

551551
async completeWithPause(params: SessionPausedParams = {}): Promise<void> {
552+
if (this.completed || this.completing) {
553+
return
554+
}
555+
this.completing = true
556+
552557
try {
553558
const { endedAt, totalDurationMs, traceSpans, workflowInput } = params
554559

@@ -580,6 +585,8 @@ export class LoggingSession {
580585
status: 'pending',
581586
})
582587

588+
this.completed = true
589+
583590
try {
584591
const { PlatformEvents, createOTelSpansForWorkflowExecution } = await import(
585592
'@/lib/core/telemetry'
@@ -616,6 +623,7 @@ export class LoggingSession {
616623
)
617624
}
618625
} catch (pauseError) {
626+
this.completing = false
619627
logger.error(`Failed to complete paused logging for execution ${this.executionId}:`, {
620628
requestId: this.requestId,
621629
workflowId: this.workflowId,
@@ -699,6 +707,10 @@ export class LoggingSession {
699707
}
700708
}
701709

710+
hasCompleted(): boolean {
711+
return this.completed
712+
}
713+
702714
private shouldStartNewCompletionAttempt(attempt: CompletionAttempt): boolean {
703715
return this.completionAttemptFailed && this.completionAttempt !== 'error' && attempt === 'error'
704716
}

apps/sim/lib/workflows/executor/execution-core.test.ts

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const {
1010
safeCompleteWithErrorMock,
1111
safeCompleteWithCancellationMock,
1212
safeCompleteWithPauseMock,
13+
hasCompletedMock,
1314
updateWorkflowRunCountsMock,
1415
clearExecutionCancellationMock,
1516
buildTraceSpansMock,
@@ -25,6 +26,7 @@ const {
2526
safeCompleteWithErrorMock: vi.fn(),
2627
safeCompleteWithCancellationMock: vi.fn(),
2728
safeCompleteWithPauseMock: vi.fn(),
29+
hasCompletedMock: vi.fn(),
2830
updateWorkflowRunCountsMock: vi.fn(),
2931
clearExecutionCancellationMock: vi.fn(),
3032
buildTraceSpansMock: vi.fn(),
@@ -102,6 +104,7 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
102104
safeCompleteWithError: safeCompleteWithErrorMock,
103105
safeCompleteWithCancellation: safeCompleteWithCancellationMock,
104106
safeCompleteWithPause: safeCompleteWithPauseMock,
107+
hasCompleted: hasCompletedMock,
105108
}
106109

107110
const createSnapshot = () => ({
@@ -165,11 +168,12 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
165168
mergeSubblockStateWithValuesMock.mockImplementation((blocks) => blocks)
166169
serializeWorkflowMock.mockReturnValue({ loops: {}, parallels: {} })
167170
buildTraceSpansMock.mockReturnValue({ traceSpans: [{ id: 'span-1' }], totalDuration: 123 })
168-
safeStartMock.mockResolvedValue(undefined)
171+
safeStartMock.mockResolvedValue(true)
169172
safeCompleteMock.mockResolvedValue(undefined)
170173
safeCompleteWithErrorMock.mockResolvedValue(undefined)
171174
safeCompleteWithCancellationMock.mockResolvedValue(undefined)
172175
safeCompleteWithPauseMock.mockResolvedValue(undefined)
176+
hasCompletedMock.mockReturnValue(true)
173177
updateWorkflowRunCountsMock.mockResolvedValue(undefined)
174178
clearExecutionCancellationMock.mockResolvedValue(undefined)
175179
})
@@ -444,4 +448,77 @@ describe('executeWorkflowCore terminal finalization sequencing', () => {
444448

445449
expect(safeCompleteWithErrorMock).toHaveBeenCalledTimes(1)
446450
})
451+
452+
it('does not mark core finalization when error completion never persists a log row', async () => {
453+
const error = new Error('engine failed')
454+
executorExecuteMock.mockRejectedValue(error)
455+
hasCompletedMock.mockReturnValue(false)
456+
const snapshot = {
457+
...createSnapshot(),
458+
metadata: {
459+
...createSnapshot().metadata,
460+
executionId: 'execution-unfinalized',
461+
},
462+
}
463+
464+
await expect(
465+
executeWorkflowCore({
466+
snapshot: snapshot as any,
467+
callbacks: {},
468+
loggingSession: loggingSession as any,
469+
})
470+
).rejects.toBe(error)
471+
472+
expect(safeCompleteWithErrorMock).toHaveBeenCalledTimes(1)
473+
expect(wasExecutionFinalizedByCore(error, 'execution-unfinalized')).toBe(false)
474+
})
475+
476+
it('starts a minimal log session before error completion when setup fails early', async () => {
477+
const envError = new Error('env lookup failed')
478+
getPersonalAndWorkspaceEnvMock.mockRejectedValue(envError)
479+
480+
await expect(
481+
executeWorkflowCore({
482+
snapshot: createSnapshot() as any,
483+
callbacks: {},
484+
loggingSession: loggingSession as any,
485+
})
486+
).rejects.toBe(envError)
487+
488+
expect(safeStartMock).toHaveBeenCalledTimes(1)
489+
expect(safeStartMock).toHaveBeenCalledWith(
490+
expect.objectContaining({
491+
userId: 'user-1',
492+
workspaceId: 'workspace-1',
493+
variables: {},
494+
})
495+
)
496+
expect(safeCompleteWithErrorMock).toHaveBeenCalledTimes(1)
497+
expect(wasExecutionFinalizedByCore(envError, 'execution-1')).toBe(true)
498+
})
499+
500+
it('skips core finalization when minimal error logging cannot start', async () => {
501+
const envError = new Error('env lookup failed')
502+
getPersonalAndWorkspaceEnvMock.mockRejectedValue(envError)
503+
safeStartMock.mockResolvedValue(false)
504+
const snapshot = {
505+
...createSnapshot(),
506+
metadata: {
507+
...createSnapshot().metadata,
508+
executionId: 'execution-no-log-start',
509+
},
510+
}
511+
512+
await expect(
513+
executeWorkflowCore({
514+
snapshot: snapshot as any,
515+
callbacks: {},
516+
loggingSession: loggingSession as any,
517+
})
518+
).rejects.toBe(envError)
519+
520+
expect(safeStartMock).toHaveBeenCalledTimes(1)
521+
expect(safeCompleteWithErrorMock).not.toHaveBeenCalled()
522+
expect(wasExecutionFinalizedByCore(envError, 'execution-no-log-start')).toBe(false)
523+
})
447524
})

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ async function finalizeExecutionError(params: {
234234
traceSpans,
235235
})
236236

237-
return true
237+
return loggingSession.hasCompleted()
238238
} catch (postExecError) {
239239
logger.error(`[${requestId}] Post-execution error logging failed`, {
240240
error: postExecError,
@@ -270,13 +270,14 @@ export async function executeWorkflowCore(
270270
}
271271

272272
let processedInput = input || {}
273+
let deploymentVersionId: string | undefined
274+
let loggingStarted = false
273275

274276
try {
275277
let blocks
276278
let edges: Edge[]
277279
let loops
278280
let parallels
279-
let deploymentVersionId: string | undefined
280281

281282
// Use workflowStateOverride if provided (for diff workflows)
282283
if (metadata.workflowStateOverride) {
@@ -333,7 +334,7 @@ export async function executeWorkflowCore(
333334
// Use already-decrypted values for execution (no redundant decryption)
334335
const decryptedEnvVars: Record<string, string> = { ...personalDecrypted, ...workspaceDecrypted }
335336

336-
await loggingSession.safeStart({
337+
loggingStarted = await loggingSession.safeStart({
337338
userId,
338339
workspaceId: providedWorkspaceId,
339340
variables,
@@ -520,12 +521,25 @@ export async function executeWorkflowCore(
520521
} catch (error: unknown) {
521522
logger.error(`[${requestId}] Execution failed:`, error)
522523

523-
const finalized = await finalizeExecutionError({
524-
error,
525-
loggingSession,
526-
executionId,
527-
requestId,
528-
})
524+
if (!loggingStarted) {
525+
loggingStarted = await loggingSession.safeStart({
526+
userId,
527+
workspaceId: providedWorkspaceId,
528+
variables: {},
529+
triggerData: metadata.correlation ? { correlation: metadata.correlation } : undefined,
530+
skipLogCreation,
531+
deploymentVersionId,
532+
})
533+
}
534+
535+
const finalized = loggingStarted
536+
? await finalizeExecutionError({
537+
error,
538+
loggingSession,
539+
executionId,
540+
requestId,
541+
})
542+
: false
529543

530544
if (finalized) {
531545
markExecutionFinalizedByCore(error, executionId)

0 commit comments

Comments
 (0)