Skip to content

Commit c3d9c23

Browse files
fix(execution): finalize runs before wrapper recovery
1 parent 9295499 commit c3d9c23

File tree

7 files changed

+601
-72
lines changed

7 files changed

+601
-72
lines changed

apps/sim/background/schedule-execution.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core
88
import { preprocessExecution } from '@/lib/execution/preprocessing'
99
import { LoggingSession } from '@/lib/logs/execution/logging-session'
1010
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
11-
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
11+
import {
12+
executeWorkflowCore,
13+
wasExecutionFinalizedByCore,
14+
} from '@/lib/workflows/executor/execution-core'
1215
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
1316
import {
1417
blockExistsInDeployment,
@@ -249,6 +252,10 @@ async function runWorkflowExecution({
249252
} catch (error: unknown) {
250253
logger.error(`[${requestId}] Early failure in scheduled workflow ${payload.workflowId}`, error)
251254

255+
if (wasExecutionFinalizedByCore(error, executionId)) {
256+
throw error
257+
}
258+
252259
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
253260
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
254261

apps/sim/background/webhook-execution.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import { LoggingSession } from '@/lib/logs/execution/logging-session'
1212
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
1313
import { WebhookAttachmentProcessor } from '@/lib/webhooks/attachment-processor'
1414
import { fetchAndProcessAirtablePayloads, formatWebhookInput } from '@/lib/webhooks/utils.server'
15-
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
15+
import {
16+
executeWorkflowCore,
17+
wasExecutionFinalizedByCore,
18+
} from '@/lib/workflows/executor/execution-core'
1619
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
1720
import { loadDeployedWorkflowState } from '@/lib/workflows/persistence/utils'
1821
import { resolveOAuthAccountId } from '@/app/api/auth/oauth/utils'
@@ -623,6 +626,10 @@ async function executeWebhookJobInternal(
623626
provider: payload.provider,
624627
})
625628

629+
if (wasExecutionFinalizedByCore(error, executionId)) {
630+
throw error
631+
}
632+
626633
try {
627634
await loggingSession.safeStart({
628635
userId: payload.userId,

apps/sim/background/workflow-execution.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import { createTimeoutAbortController, getTimeoutErrorMessage } from '@/lib/core
55
import { preprocessExecution } from '@/lib/execution/preprocessing'
66
import { LoggingSession } from '@/lib/logs/execution/logging-session'
77
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
8-
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
8+
import {
9+
executeWorkflowCore,
10+
wasExecutionFinalizedByCore,
11+
} from '@/lib/workflows/executor/execution-core'
912
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
1013
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
1114
import type { ExecutionMetadata } from '@/executor/execution/types'
@@ -178,6 +181,10 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
178181
executionId,
179182
})
180183

184+
if (wasExecutionFinalizedByCore(error, executionId)) {
185+
throw error
186+
}
187+
181188
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
182189
const { traceSpans } = executionResult ? buildTraceSpans(executionResult) : { traceSpans: [] }
183190

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { describe, expect, it, vi } from 'vitest'
2+
3+
vi.mock('@sim/db', () => ({
4+
db: {},
5+
}))
6+
7+
vi.mock('@sim/db/schema', () => ({
8+
workflowExecutionLogs: {},
9+
}))
10+
11+
vi.mock('@sim/logger', () => ({
12+
createLogger: () => ({
13+
info: vi.fn(),
14+
error: vi.fn(),
15+
warn: vi.fn(),
16+
debug: vi.fn(),
17+
}),
18+
}))
19+
20+
vi.mock('drizzle-orm', () => ({
21+
eq: vi.fn(),
22+
sql: vi.fn(),
23+
}))
24+
25+
vi.mock('@/lib/logs/execution/logger', () => ({
26+
executionLogger: {
27+
startWorkflowExecution: vi.fn(),
28+
completeWorkflowExecution: vi.fn(),
29+
},
30+
}))
31+
32+
vi.mock('@/lib/logs/execution/logging-factory', () => ({
33+
calculateCostSummary: vi.fn().mockReturnValue({
34+
totalCost: 0,
35+
totalInputCost: 0,
36+
totalOutputCost: 0,
37+
totalTokens: 0,
38+
totalPromptTokens: 0,
39+
totalCompletionTokens: 0,
40+
baseExecutionCharge: 0,
41+
modelCost: 0,
42+
models: {},
43+
}),
44+
createEnvironmentObject: vi.fn(),
45+
createTriggerObject: vi.fn(),
46+
loadDeployedWorkflowStateForLogging: vi.fn(),
47+
loadWorkflowStateForExecution: vi.fn(),
48+
}))
49+
50+
import { LoggingSession } from './logging-session'
51+
52+
describe('LoggingSession completion retries', () => {
53+
it('clears failed completion promise so error finalization can retry', async () => {
54+
const session = new LoggingSession('workflow-1', 'execution-1', 'api', 'req-1') as any
55+
56+
const successFinalizeError = new Error('success finalize failed')
57+
session.complete = vi.fn().mockRejectedValue(successFinalizeError)
58+
session.completeWithCostOnlyLog = vi.fn().mockRejectedValue(successFinalizeError)
59+
session.completeWithError = vi.fn().mockResolvedValue(undefined)
60+
61+
await expect(session.safeComplete({ finalOutput: { ok: true } })).rejects.toThrow(
62+
'success finalize failed'
63+
)
64+
65+
await expect(
66+
session.safeCompleteWithError({
67+
error: { message: 'fallback error finalize' },
68+
})
69+
).resolves.toBeUndefined()
70+
71+
expect(session.complete).toHaveBeenCalledTimes(1)
72+
expect(session.completeWithCostOnlyLog).toHaveBeenCalledTimes(1)
73+
expect(session.completeWithError).toHaveBeenCalledTimes(1)
74+
})
75+
})

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,10 @@ export class LoggingSession {
690690

691691
async safeComplete(params: SessionCompleteParams = {}): Promise<void> {
692692
if (this.completionPromise) return this.completionPromise
693-
this.completionPromise = this._safeCompleteImpl(params)
693+
this.completionPromise = this._safeCompleteImpl(params).catch((error) => {
694+
this.completionPromise = null
695+
throw error
696+
})
694697
return this.completionPromise
695698
}
696699

@@ -715,7 +718,10 @@ export class LoggingSession {
715718

716719
async safeCompleteWithError(params?: SessionErrorCompleteParams): Promise<void> {
717720
if (this.completionPromise) return this.completionPromise
718-
this.completionPromise = this._safeCompleteWithErrorImpl(params)
721+
this.completionPromise = this._safeCompleteWithErrorImpl(params).catch((error) => {
722+
this.completionPromise = null
723+
throw error
724+
})
719725
return this.completionPromise
720726
}
721727

@@ -742,7 +748,10 @@ export class LoggingSession {
742748

743749
async safeCompleteWithCancellation(params?: SessionCancelledParams): Promise<void> {
744750
if (this.completionPromise) return this.completionPromise
745-
this.completionPromise = this._safeCompleteWithCancellationImpl(params)
751+
this.completionPromise = this._safeCompleteWithCancellationImpl(params).catch((error) => {
752+
this.completionPromise = null
753+
throw error
754+
})
746755
return this.completionPromise
747756
}
748757

@@ -768,7 +777,10 @@ export class LoggingSession {
768777

769778
async safeCompleteWithPause(params?: SessionPausedParams): Promise<void> {
770779
if (this.completionPromise) return this.completionPromise
771-
this.completionPromise = this._safeCompleteWithPauseImpl(params)
780+
this.completionPromise = this._safeCompleteWithPauseImpl(params).catch((error) => {
781+
this.completionPromise = null
782+
throw error
783+
})
772784
return this.completionPromise
773785
}
774786

0 commit comments

Comments
 (0)