Skip to content

Commit 9229002

Browse files
PlaneInABottletest
andauthored
fix(execution): queued execution finalization and async correlation (#3535)
* fix(execution): finalize runs before wrapper recovery * fix(async): preserve execution correlation across queued runs * fix(webhooks): pass correlation into preprocessing * style(webhooks): normalize webhook executor formatting * fix(async): avoid pre-starting queued execution logs Let executeWorkflowCore own normal-path logging start so queued workflow and schedule executions persist the richer deployment and environment metadata instead of an earlier placeholder start record. * fix(async): harden execution finalization guards Prevent leaked core finalization markers from accumulating while keeping outer recovery paths idempotent. Preserve best-effort logging completion by reusing settled completion promises instead of reopening duplicate terminal writes. * fix(async): preserve outcomes during cleanup Keep execution finalization cleanup best-effort so cancellation cleanup failures do not overwrite successful or failed outcomes. Restore webhook processor formatting to the repository Biome style to avoid noisy formatter churn. * fix(async): keep execution finalization state consistent Retry minimal logging for early failures, only mark core finalization after a log row actually completes, and let paused completions fall back cleanly. * fix(async): clean stale finalization guards Scan all finalized execution ids during TTL cleanup so refreshed keys cannot keep expired guards alive, and cover the reused-id ordering regression. * fix(async): retry failed error finalization Allow error finalization to retry after a non-error completion and fallback both fail, and always persist failed/error semantics for completeWithError. * fix(webhooks): reuse preprocessing execution ids Thread preprocessing execution identity into queued webhook execution so both phases share the same correlation and logs. --------- Co-authored-by: test <test@example.com>
1 parent d84cba6 commit 9229002

26 files changed

+2453
-166
lines changed

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ vi.mock('@sim/db', () => ({
107107
},
108108
}))
109109

110-
import { GET } from '@/app/api/schedules/execute/route'
110+
vi.mock('uuid', () => ({
111+
v4: vi.fn().mockReturnValue('schedule-execution-1'),
112+
}))
113+
114+
import { GET } from './route'
111115

112116
const SINGLE_SCHEDULE = [
113117
{
@@ -204,4 +208,44 @@ describe('Scheduled Workflow Execution API Route', () => {
204208
const data = await response.json()
205209
expect(data).toHaveProperty('executedCount', 2)
206210
})
211+
212+
it('should enqueue preassigned correlation metadata for schedules', async () => {
213+
mockDbReturning.mockReturnValue(SINGLE_SCHEDULE)
214+
215+
const response = await GET(createMockRequest())
216+
217+
expect(response.status).toBe(200)
218+
expect(mockEnqueue).toHaveBeenCalledWith(
219+
'schedule-execution',
220+
expect.objectContaining({
221+
scheduleId: 'schedule-1',
222+
workflowId: 'workflow-1',
223+
executionId: 'schedule-execution-1',
224+
requestId: 'test-request-id',
225+
correlation: {
226+
executionId: 'schedule-execution-1',
227+
requestId: 'test-request-id',
228+
source: 'schedule',
229+
workflowId: 'workflow-1',
230+
scheduleId: 'schedule-1',
231+
triggerType: 'schedule',
232+
scheduledFor: '2025-01-01T00:00:00.000Z',
233+
},
234+
}),
235+
{
236+
metadata: {
237+
workflowId: 'workflow-1',
238+
correlation: {
239+
executionId: 'schedule-execution-1',
240+
requestId: 'test-request-id',
241+
source: 'schedule',
242+
workflowId: 'workflow-1',
243+
scheduleId: 'schedule-1',
244+
triggerType: 'schedule',
245+
scheduledFor: '2025-01-01T00:00:00.000Z',
246+
},
247+
},
248+
}
249+
)
250+
})
207251
})

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
22
import { createLogger } from '@sim/logger'
33
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
44
import { type NextRequest, NextResponse } from 'next/server'
5+
import { v4 as uuidv4 } from 'uuid'
56
import { verifyCronAuth } from '@/lib/auth/internal'
67
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
78
import { generateRequestId } from '@/lib/core/utils/request'
@@ -57,10 +58,23 @@ export async function GET(request: NextRequest) {
5758

5859
const queuePromises = dueSchedules.map(async (schedule) => {
5960
const queueTime = schedule.lastQueuedAt ?? queuedAt
61+
const executionId = uuidv4()
62+
const correlation = {
63+
executionId,
64+
requestId,
65+
source: 'schedule' as const,
66+
workflowId: schedule.workflowId,
67+
scheduleId: schedule.id,
68+
triggerType: 'schedule',
69+
scheduledFor: schedule.nextRunAt?.toISOString(),
70+
}
6071

6172
const payload = {
6273
scheduleId: schedule.id,
6374
workflowId: schedule.workflowId,
75+
executionId,
76+
requestId,
77+
correlation,
6478
blockId: schedule.blockId || undefined,
6579
cronExpression: schedule.cronExpression || undefined,
6680
lastRanAt: schedule.lastRanAt?.toISOString(),
@@ -71,7 +85,7 @@ export async function GET(request: NextRequest) {
7185

7286
try {
7387
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
74-
metadata: { workflowId: schedule.workflowId },
88+
metadata: { workflowId: schedule.workflowId, correlation },
7589
})
7690
logger.info(
7791
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`

apps/sim/app/api/webhooks/trigger/[path]/route.test.ts

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ const {
101101
processWebhookMock,
102102
executeMock,
103103
getWorkspaceBilledAccountUserIdMock,
104+
queueWebhookExecutionMock,
104105
} = vi.hoisted(() => ({
105106
generateRequestHashMock: vi.fn().mockResolvedValue('test-hash-123'),
106107
validateSlackSignatureMock: vi.fn().mockResolvedValue(true),
@@ -125,6 +126,10 @@ const {
125126
.mockImplementation(async (workspaceId: string | null | undefined) =>
126127
workspaceId ? 'test-user-id' : null
127128
),
129+
queueWebhookExecutionMock: vi.fn().mockImplementation(async () => {
130+
const { NextResponse } = await import('next/server')
131+
return NextResponse.json({ message: 'Webhook processed' })
132+
}),
128133
}))
129134

130135
vi.mock('@trigger.dev/sdk', () => ({
@@ -350,21 +355,28 @@ vi.mock('@/lib/webhooks/processor', () => ({
350355
return null
351356
}
352357
),
353-
checkWebhookPreprocessing: vi
354-
.fn()
355-
.mockResolvedValue({ error: null, actorUserId: 'test-user-id' }),
358+
checkWebhookPreprocessing: vi.fn().mockResolvedValue({
359+
error: null,
360+
actorUserId: 'test-user-id',
361+
executionId: 'preprocess-execution-id',
362+
correlation: {
363+
executionId: 'preprocess-execution-id',
364+
requestId: 'mock-request-id',
365+
source: 'webhook',
366+
workflowId: 'test-workflow-id',
367+
webhookId: 'generic-webhook-id',
368+
path: 'test-path',
369+
provider: 'generic',
370+
triggerType: 'webhook',
371+
},
372+
}),
356373
formatProviderErrorResponse: vi.fn().mockImplementation((_webhook, error, status) => {
357374
const { NextResponse } = require('next/server')
358375
return NextResponse.json({ error }, { status })
359376
}),
360377
shouldSkipWebhookEvent: vi.fn().mockReturnValue(false),
361378
handlePreDeploymentVerification: vi.fn().mockReturnValue(null),
362-
queueWebhookExecution: vi.fn().mockImplementation(async () => {
363-
// Call processWebhookMock so tests can verify it was called
364-
processWebhookMock()
365-
const { NextResponse } = await import('next/server')
366-
return NextResponse.json({ message: 'Webhook processed' })
367-
}),
379+
queueWebhookExecution: queueWebhookExecutionMock,
368380
}))
369381

370382
vi.mock('drizzle-orm/postgres-js', () => ({
@@ -419,7 +431,7 @@ describe('Webhook Trigger API Route', () => {
419431

420432
const params = Promise.resolve({ path: 'non-existent-path' })
421433

422-
const response = await POST(req, { params })
434+
const response = await POST(req as any, { params })
423435

424436
expect(response.status).toBe(404)
425437

@@ -494,6 +506,47 @@ describe('Webhook Trigger API Route', () => {
494506
})
495507

496508
describe('Generic Webhook Authentication', () => {
509+
it('passes correlation-bearing request context into webhook queueing', async () => {
510+
testData.webhooks.push({
511+
id: 'generic-webhook-id',
512+
provider: 'generic',
513+
path: 'test-path',
514+
isActive: true,
515+
providerConfig: { requireAuth: false },
516+
workflowId: 'test-workflow-id',
517+
})
518+
519+
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
520+
const params = Promise.resolve({ path: 'test-path' })
521+
522+
const response = await POST(req as any, { params })
523+
524+
expect(response.status).toBe(200)
525+
expect(queueWebhookExecutionMock).toHaveBeenCalledOnce()
526+
const call = queueWebhookExecutionMock.mock.calls[0]
527+
expect(call[0]).toEqual(expect.objectContaining({ id: 'generic-webhook-id' }))
528+
expect(call[1]).toEqual(expect.objectContaining({ id: 'test-workflow-id' }))
529+
expect(call[2]).toEqual(expect.objectContaining({ event: 'test', id: 'test-123' }))
530+
expect(call[4]).toEqual(
531+
expect.objectContaining({
532+
requestId: 'mock-request-id',
533+
path: 'test-path',
534+
actorUserId: 'test-user-id',
535+
executionId: 'preprocess-execution-id',
536+
correlation: {
537+
executionId: 'preprocess-execution-id',
538+
requestId: 'mock-request-id',
539+
source: 'webhook',
540+
workflowId: 'test-workflow-id',
541+
webhookId: 'generic-webhook-id',
542+
path: 'test-path',
543+
provider: 'generic',
544+
triggerType: 'webhook',
545+
},
546+
})
547+
)
548+
})
549+
497550
it('should process generic webhook without authentication', async () => {
498551
testData.webhooks.push({
499552
id: 'generic-webhook-id',
@@ -514,7 +567,7 @@ describe('Webhook Trigger API Route', () => {
514567
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
515568
const params = Promise.resolve({ path: 'test-path' })
516569

517-
const response = await POST(req, { params })
570+
const response = await POST(req as any, { params })
518571

519572
expect(response.status).toBe(200)
520573

@@ -544,7 +597,7 @@ describe('Webhook Trigger API Route', () => {
544597
const req = createMockRequest('POST', { event: 'bearer.test' }, headers)
545598
const params = Promise.resolve({ path: 'test-path' })
546599

547-
const response = await POST(req, { params })
600+
const response = await POST(req as any, { params })
548601

549602
expect(response.status).toBe(200)
550603
})
@@ -575,7 +628,7 @@ describe('Webhook Trigger API Route', () => {
575628
const req = createMockRequest('POST', { event: 'custom.header.test' }, headers)
576629
const params = Promise.resolve({ path: 'test-path' })
577630

578-
const response = await POST(req, { params })
631+
const response = await POST(req as any, { params })
579632

580633
expect(response.status).toBe(200)
581634
})
@@ -610,7 +663,7 @@ describe('Webhook Trigger API Route', () => {
610663
const req = createMockRequest('POST', { event: 'case.test' }, headers)
611664
const params = Promise.resolve({ path: 'test-path' })
612665

613-
const response = await POST(req, { params })
666+
const response = await POST(req as any, { params })
614667

615668
expect(response.status).toBe(200)
616669
}
@@ -645,7 +698,7 @@ describe('Webhook Trigger API Route', () => {
645698
const req = createMockRequest('POST', { event: 'custom.case.test' }, headers)
646699
const params = Promise.resolve({ path: 'test-path' })
647700

648-
const response = await POST(req, { params })
701+
const response = await POST(req as any, { params })
649702

650703
expect(response.status).toBe(200)
651704
}
@@ -668,7 +721,7 @@ describe('Webhook Trigger API Route', () => {
668721
const req = createMockRequest('POST', { event: 'wrong.token.test' }, headers)
669722
const params = Promise.resolve({ path: 'test-path' })
670723

671-
const response = await POST(req, { params })
724+
const response = await POST(req as any, { params })
672725

673726
expect(response.status).toBe(401)
674727
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -696,7 +749,7 @@ describe('Webhook Trigger API Route', () => {
696749
const req = createMockRequest('POST', { event: 'wrong.custom.test' }, headers)
697750
const params = Promise.resolve({ path: 'test-path' })
698751

699-
const response = await POST(req, { params })
752+
const response = await POST(req as any, { params })
700753

701754
expect(response.status).toBe(401)
702755
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -716,7 +769,7 @@ describe('Webhook Trigger API Route', () => {
716769
const req = createMockRequest('POST', { event: 'no.auth.test' })
717770
const params = Promise.resolve({ path: 'test-path' })
718771

719-
const response = await POST(req, { params })
772+
const response = await POST(req as any, { params })
720773

721774
expect(response.status).toBe(401)
722775
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -744,7 +797,7 @@ describe('Webhook Trigger API Route', () => {
744797
const req = createMockRequest('POST', { event: 'exclusivity.test' }, headers)
745798
const params = Promise.resolve({ path: 'test-path' })
746799

747-
const response = await POST(req, { params })
800+
const response = await POST(req as any, { params })
748801

749802
expect(response.status).toBe(401)
750803
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -772,7 +825,7 @@ describe('Webhook Trigger API Route', () => {
772825
const req = createMockRequest('POST', { event: 'wrong.header.name.test' }, headers)
773826
const params = Promise.resolve({ path: 'test-path' })
774827

775-
const response = await POST(req, { params })
828+
const response = await POST(req as any, { params })
776829

777830
expect(response.status).toBe(401)
778831
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -797,7 +850,7 @@ describe('Webhook Trigger API Route', () => {
797850
const req = createMockRequest('POST', { event: 'no.token.config.test' }, headers)
798851
const params = Promise.resolve({ path: 'test-path' })
799852

800-
const response = await POST(req, { params })
853+
const response = await POST(req as any, { params })
801854

802855
expect(response.status).toBe(401)
803856
expect(await response.text()).toContain(

apps/sim/app/api/webhooks/trigger/[path]/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ export async function POST(
144144
requestId,
145145
path,
146146
actorUserId: preprocessResult.actorUserId,
147+
executionId: preprocessResult.executionId,
148+
correlation: preprocessResult.correlation,
147149
})
148150
responses.push(response)
149151
}

0 commit comments

Comments
 (0)