Skip to content

Commit 78db4ee

Browse files
fix(async): preserve execution correlation across queued runs
1 parent c3d9c23 commit 78db4ee

File tree

22 files changed

+2031
-751
lines changed

22 files changed

+2031
-751
lines changed

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,11 @@ vi.mock('@sim/db', () => ({
107107
},
108108
}))
109109

110-
import { GET } from '@/app/api/schedules/execute/route'
110+
vi.mock('uuid', () => ({
111+
v4: vi.fn().mockReturnValue('schedule-execution-1'),
112+
}))
113+
114+
import { GET } from './route'
111115

112116
const SINGLE_SCHEDULE = [
113117
{
@@ -204,4 +208,44 @@ describe('Scheduled Workflow Execution API Route', () => {
204208
const data = await response.json()
205209
expect(data).toHaveProperty('executedCount', 2)
206210
})
211+
212+
it('should enqueue preassigned correlation metadata for schedules', async () => {
213+
mockDbReturning.mockReturnValue(SINGLE_SCHEDULE)
214+
215+
const response = await GET(createMockRequest())
216+
217+
expect(response.status).toBe(200)
218+
expect(mockEnqueue).toHaveBeenCalledWith(
219+
'schedule-execution',
220+
expect.objectContaining({
221+
scheduleId: 'schedule-1',
222+
workflowId: 'workflow-1',
223+
executionId: 'schedule-execution-1',
224+
requestId: 'test-request-id',
225+
correlation: {
226+
executionId: 'schedule-execution-1',
227+
requestId: 'test-request-id',
228+
source: 'schedule',
229+
workflowId: 'workflow-1',
230+
scheduleId: 'schedule-1',
231+
triggerType: 'schedule',
232+
scheduledFor: '2025-01-01T00:00:00.000Z',
233+
},
234+
}),
235+
{
236+
metadata: {
237+
workflowId: 'workflow-1',
238+
correlation: {
239+
executionId: 'schedule-execution-1',
240+
requestId: 'test-request-id',
241+
source: 'schedule',
242+
workflowId: 'workflow-1',
243+
scheduleId: 'schedule-1',
244+
triggerType: 'schedule',
245+
scheduledFor: '2025-01-01T00:00:00.000Z',
246+
},
247+
},
248+
}
249+
)
250+
})
207251
})

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
22
import { createLogger } from '@sim/logger'
33
import { and, eq, isNull, lt, lte, not, or, sql } from 'drizzle-orm'
44
import { type NextRequest, NextResponse } from 'next/server'
5+
import { v4 as uuidv4 } from 'uuid'
56
import { verifyCronAuth } from '@/lib/auth/internal'
67
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
78
import { generateRequestId } from '@/lib/core/utils/request'
@@ -57,10 +58,23 @@ export async function GET(request: NextRequest) {
5758

5859
const queuePromises = dueSchedules.map(async (schedule) => {
5960
const queueTime = schedule.lastQueuedAt ?? queuedAt
61+
const executionId = uuidv4()
62+
const correlation = {
63+
executionId,
64+
requestId,
65+
source: 'schedule' as const,
66+
workflowId: schedule.workflowId,
67+
scheduleId: schedule.id,
68+
triggerType: 'schedule',
69+
scheduledFor: schedule.nextRunAt?.toISOString(),
70+
}
6071

6172
const payload = {
6273
scheduleId: schedule.id,
6374
workflowId: schedule.workflowId,
75+
executionId,
76+
requestId,
77+
correlation,
6478
blockId: schedule.blockId || undefined,
6579
cronExpression: schedule.cronExpression || undefined,
6680
lastRanAt: schedule.lastRanAt?.toISOString(),
@@ -71,7 +85,7 @@ export async function GET(request: NextRequest) {
7185

7286
try {
7387
const jobId = await jobQueue.enqueue('schedule-execution', payload, {
74-
metadata: { workflowId: schedule.workflowId },
88+
metadata: { workflowId: schedule.workflowId, correlation },
7589
})
7690
logger.info(
7791
`[${requestId}] Queued schedule execution task ${jobId} for workflow ${schedule.workflowId}`

apps/sim/app/api/webhooks/trigger/[path]/route.test.ts

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ const {
101101
processWebhookMock,
102102
executeMock,
103103
getWorkspaceBilledAccountUserIdMock,
104+
queueWebhookExecutionMock,
104105
} = vi.hoisted(() => ({
105106
generateRequestHashMock: vi.fn().mockResolvedValue('test-hash-123'),
106107
validateSlackSignatureMock: vi.fn().mockResolvedValue(true),
@@ -125,6 +126,10 @@ const {
125126
.mockImplementation(async (workspaceId: string | null | undefined) =>
126127
workspaceId ? 'test-user-id' : null
127128
),
129+
queueWebhookExecutionMock: vi.fn().mockImplementation(async () => {
130+
const { NextResponse } = await import('next/server')
131+
return NextResponse.json({ message: 'Webhook processed' })
132+
}),
128133
}))
129134

130135
vi.mock('@trigger.dev/sdk', () => ({
@@ -333,12 +338,7 @@ vi.mock('@/lib/webhooks/processor', () => ({
333338
}),
334339
shouldSkipWebhookEvent: vi.fn().mockReturnValue(false),
335340
handlePreDeploymentVerification: vi.fn().mockReturnValue(null),
336-
queueWebhookExecution: vi.fn().mockImplementation(async () => {
337-
// Call processWebhookMock so tests can verify it was called
338-
processWebhookMock()
339-
const { NextResponse } = await import('next/server')
340-
return NextResponse.json({ message: 'Webhook processed' })
341-
}),
341+
queueWebhookExecution: queueWebhookExecutionMock,
342342
}))
343343

344344
vi.mock('drizzle-orm/postgres-js', () => ({
@@ -353,7 +353,7 @@ vi.mock('@/lib/core/utils/request', () => requestUtilsMock)
353353

354354
process.env.DATABASE_URL = 'postgresql://test:test@localhost:5432/test'
355355

356-
import { POST } from '@/app/api/webhooks/trigger/[path]/route'
356+
import { POST } from './route'
357357

358358
describe('Webhook Trigger API Route', () => {
359359
beforeEach(() => {
@@ -393,7 +393,7 @@ describe('Webhook Trigger API Route', () => {
393393

394394
const params = Promise.resolve({ path: 'non-existent-path' })
395395

396-
const response = await POST(req, { params })
396+
const response = await POST(req as any, { params })
397397

398398
expect(response.status).toBe(404)
399399

@@ -402,6 +402,32 @@ describe('Webhook Trigger API Route', () => {
402402
})
403403

404404
describe('Generic Webhook Authentication', () => {
405+
it('passes correlation-bearing request context into webhook queueing', async () => {
406+
testData.webhooks.push({
407+
id: 'generic-webhook-id',
408+
provider: 'generic',
409+
path: 'test-path',
410+
isActive: true,
411+
providerConfig: { requireAuth: false },
412+
workflowId: 'test-workflow-id',
413+
})
414+
415+
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
416+
const params = Promise.resolve({ path: 'test-path' })
417+
418+
const response = await POST(req as any, { params })
419+
420+
expect(response.status).toBe(200)
421+
expect(queueWebhookExecutionMock).toHaveBeenCalledOnce()
422+
const call = queueWebhookExecutionMock.mock.calls[0]
423+
expect(call[0]).toEqual(expect.objectContaining({ id: 'generic-webhook-id' }))
424+
expect(call[1]).toEqual(expect.objectContaining({ id: 'test-workflow-id' }))
425+
expect(call[2]).toEqual(expect.objectContaining({ event: 'test', id: 'test-123' }))
426+
expect(call[4]).toEqual(
427+
expect.objectContaining({ requestId: 'mock-request-id', path: 'test-path' })
428+
)
429+
})
430+
405431
it('should process generic webhook without authentication', async () => {
406432
testData.webhooks.push({
407433
id: 'generic-webhook-id',
@@ -422,7 +448,7 @@ describe('Webhook Trigger API Route', () => {
422448
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
423449
const params = Promise.resolve({ path: 'test-path' })
424450

425-
const response = await POST(req, { params })
451+
const response = await POST(req as any, { params })
426452

427453
expect(response.status).toBe(200)
428454

@@ -452,7 +478,7 @@ describe('Webhook Trigger API Route', () => {
452478
const req = createMockRequest('POST', { event: 'bearer.test' }, headers)
453479
const params = Promise.resolve({ path: 'test-path' })
454480

455-
const response = await POST(req, { params })
481+
const response = await POST(req as any, { params })
456482

457483
expect(response.status).toBe(200)
458484
})
@@ -483,7 +509,7 @@ describe('Webhook Trigger API Route', () => {
483509
const req = createMockRequest('POST', { event: 'custom.header.test' }, headers)
484510
const params = Promise.resolve({ path: 'test-path' })
485511

486-
const response = await POST(req, { params })
512+
const response = await POST(req as any, { params })
487513

488514
expect(response.status).toBe(200)
489515
})
@@ -518,7 +544,7 @@ describe('Webhook Trigger API Route', () => {
518544
const req = createMockRequest('POST', { event: 'case.test' }, headers)
519545
const params = Promise.resolve({ path: 'test-path' })
520546

521-
const response = await POST(req, { params })
547+
const response = await POST(req as any, { params })
522548

523549
expect(response.status).toBe(200)
524550
}
@@ -553,7 +579,7 @@ describe('Webhook Trigger API Route', () => {
553579
const req = createMockRequest('POST', { event: 'custom.case.test' }, headers)
554580
const params = Promise.resolve({ path: 'test-path' })
555581

556-
const response = await POST(req, { params })
582+
const response = await POST(req as any, { params })
557583

558584
expect(response.status).toBe(200)
559585
}
@@ -576,7 +602,7 @@ describe('Webhook Trigger API Route', () => {
576602
const req = createMockRequest('POST', { event: 'wrong.token.test' }, headers)
577603
const params = Promise.resolve({ path: 'test-path' })
578604

579-
const response = await POST(req, { params })
605+
const response = await POST(req as any, { params })
580606

581607
expect(response.status).toBe(401)
582608
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -604,7 +630,7 @@ describe('Webhook Trigger API Route', () => {
604630
const req = createMockRequest('POST', { event: 'wrong.custom.test' }, headers)
605631
const params = Promise.resolve({ path: 'test-path' })
606632

607-
const response = await POST(req, { params })
633+
const response = await POST(req as any, { params })
608634

609635
expect(response.status).toBe(401)
610636
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -624,7 +650,7 @@ describe('Webhook Trigger API Route', () => {
624650
const req = createMockRequest('POST', { event: 'no.auth.test' })
625651
const params = Promise.resolve({ path: 'test-path' })
626652

627-
const response = await POST(req, { params })
653+
const response = await POST(req as any, { params })
628654

629655
expect(response.status).toBe(401)
630656
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -652,7 +678,7 @@ describe('Webhook Trigger API Route', () => {
652678
const req = createMockRequest('POST', { event: 'exclusivity.test' }, headers)
653679
const params = Promise.resolve({ path: 'test-path' })
654680

655-
const response = await POST(req, { params })
681+
const response = await POST(req as any, { params })
656682

657683
expect(response.status).toBe(401)
658684
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -680,7 +706,7 @@ describe('Webhook Trigger API Route', () => {
680706
const req = createMockRequest('POST', { event: 'wrong.header.name.test' }, headers)
681707
const params = Promise.resolve({ path: 'test-path' })
682708

683-
const response = await POST(req, { params })
709+
const response = await POST(req as any, { params })
684710

685711
expect(response.status).toBe(401)
686712
expect(await response.text()).toContain('Unauthorized - Invalid authentication token')
@@ -705,7 +731,7 @@ describe('Webhook Trigger API Route', () => {
705731
const req = createMockRequest('POST', { event: 'no.token.config.test' }, headers)
706732
const params = Promise.resolve({ path: 'test-path' })
707733

708-
const response = await POST(req, { params })
734+
const response = await POST(req as any, { params })
709735

710736
expect(response.status).toBe(401)
711737
expect(await response.text()).toContain(

0 commit comments

Comments
 (0)