Skip to content

Commit 0b8b084

Browse files
author
test
committed
fix(sim): fallback manual cancel without redis
Abort active manual SSE executions locally when Redis cannot durably record the cancellation marker so the run still finalizes as cancelled instead of completing normally.
1 parent 4f22a7e commit 0b8b084

File tree

5 files changed

+107
-1
lines changed

5 files changed

+107
-1
lines changed

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import {
2020
} from '@/lib/execution/call-chain'
2121
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
2222
import { processInputFileFields } from '@/lib/execution/files'
23+
import {
24+
registerManualExecutionAborter,
25+
unregisterManualExecutionAborter,
26+
} from '@/lib/execution/manual-cancellation'
2327
import { preprocessExecution } from '@/lib/execution/preprocessing'
2428
import { LoggingSession } from '@/lib/logs/execution/logging-session'
2529
import {
@@ -763,6 +767,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
763767
const encoder = new TextEncoder()
764768
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
765769
let isStreamClosed = false
770+
let isManualAbortRegistered = false
766771

767772
const eventWriter = createExecutionEventWriter(executionId)
768773
setExecutionMeta(executionId, {
@@ -775,6 +780,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
775780
async start(controller) {
776781
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
777782

783+
registerManualExecutionAborter(executionId, timeoutController.abort)
784+
isManualAbortRegistered = true
785+
778786
const sendEvent = (event: ExecutionEvent) => {
779787
if (!isStreamClosed) {
780788
try {
@@ -1142,6 +1150,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
11421150
})
11431151
finalMetaStatus = 'error'
11441152
} finally {
1153+
if (isManualAbortRegistered) {
1154+
unregisterManualExecutionAborter(executionId)
1155+
isManualAbortRegistered = false
1156+
}
11451157
try {
11461158
await eventWriter.close()
11471159
} catch (closeError) {

apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
88
const mockCheckHybridAuth = vi.fn()
99
const mockAuthorizeWorkflowByWorkspacePermission = vi.fn()
1010
const mockMarkExecutionCancelled = vi.fn()
11+
const mockAbortManualExecution = vi.fn()
1112

1213
vi.mock('@sim/logger', () => ({
1314
createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }),
@@ -21,6 +22,10 @@ vi.mock('@/lib/execution/cancellation', () => ({
2122
markExecutionCancelled: (...args: unknown[]) => mockMarkExecutionCancelled(...args),
2223
}))
2324

25+
vi.mock('@/lib/execution/manual-cancellation', () => ({
26+
abortManualExecution: (...args: unknown[]) => mockAbortManualExecution(...args),
27+
}))
28+
2429
vi.mock('@/lib/workflows/utils', () => ({
2530
authorizeWorkflowByWorkspacePermission: (params: unknown) =>
2631
mockAuthorizeWorkflowByWorkspacePermission(params),
@@ -33,6 +38,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => {
3338
vi.clearAllMocks()
3439
mockCheckHybridAuth.mockResolvedValue({ success: true, userId: 'user-1' })
3540
mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true })
41+
mockAbortManualExecution.mockReturnValue(false)
3642
})
3743

3844
it('returns success when cancellation was durably recorded', async () => {
@@ -56,6 +62,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => {
5662
executionId: 'ex-1',
5763
redisAvailable: true,
5864
durablyRecorded: true,
65+
locallyAborted: false,
5966
reason: 'recorded',
6067
})
6168
})
@@ -81,6 +88,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => {
8188
executionId: 'ex-1',
8289
redisAvailable: false,
8390
durablyRecorded: false,
91+
locallyAborted: false,
8492
reason: 'redis_unavailable',
8593
})
8694
})
@@ -106,7 +114,35 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => {
106114
executionId: 'ex-1',
107115
redisAvailable: true,
108116
durablyRecorded: false,
117+
locallyAborted: false,
109118
reason: 'redis_write_failed',
110119
})
111120
})
121+
122+
it('returns success when local fallback aborts execution without Redis durability', async () => {
123+
mockMarkExecutionCancelled.mockResolvedValue({
124+
durablyRecorded: false,
125+
reason: 'redis_unavailable',
126+
})
127+
mockAbortManualExecution.mockReturnValue(true)
128+
129+
const response = await POST(
130+
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
131+
method: 'POST',
132+
}),
133+
{
134+
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
135+
}
136+
)
137+
138+
expect(response.status).toBe(200)
139+
await expect(response.json()).resolves.toEqual({
140+
success: true,
141+
executionId: 'ex-1',
142+
redisAvailable: false,
143+
durablyRecorded: false,
144+
locallyAborted: true,
145+
reason: 'redis_unavailable',
146+
})
147+
})
112148
})

apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
22
import { type NextRequest, NextResponse } from 'next/server'
33
import { checkHybridAuth } from '@/lib/auth/hybrid'
44
import { markExecutionCancelled } from '@/lib/execution/cancellation'
5+
import { abortManualExecution } from '@/lib/execution/manual-cancellation'
56
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
67

78
const logger = createLogger('CancelExecutionAPI')
@@ -36,9 +37,12 @@ export async function POST(
3637
logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId })
3738

3839
const cancellation = await markExecutionCancelled(executionId)
40+
const locallyAborted = abortManualExecution(executionId)
3941

4042
if (cancellation.durablyRecorded) {
4143
logger.info('Execution marked as cancelled in Redis', { executionId })
44+
} else if (locallyAborted) {
45+
logger.info('Execution cancelled via local in-process fallback', { executionId })
4246
} else {
4347
logger.warn('Execution cancellation was not durably recorded', {
4448
executionId,
@@ -47,10 +51,11 @@ export async function POST(
4751
}
4852

4953
return NextResponse.json({
50-
success: cancellation.durablyRecorded,
54+
success: cancellation.durablyRecorded || locallyAborted,
5155
executionId,
5256
redisAvailable: cancellation.reason !== 'redis_unavailable',
5357
durablyRecorded: cancellation.durablyRecorded,
58+
locallyAborted,
5459
reason: cancellation.reason,
5560
})
5661
} catch (error: any) {

apps/sim/lib/execution/cancellation.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ vi.mock('@/lib/core/config/redis', () => ({
1313
}))
1414

1515
import { markExecutionCancelled } from './cancellation'
16+
import {
17+
abortManualExecution,
18+
registerManualExecutionAborter,
19+
unregisterManualExecutionAborter,
20+
} from './manual-cancellation'
1621

1722
describe('markExecutionCancelled', () => {
1823
beforeEach(() => {
@@ -48,3 +53,32 @@ describe('markExecutionCancelled', () => {
4853
})
4954
})
5055
})
56+
57+
describe('manual execution cancellation registry', () => {
58+
beforeEach(() => {
59+
unregisterManualExecutionAborter('execution-1')
60+
})
61+
62+
it('aborts registered executions', () => {
63+
const abort = vi.fn()
64+
65+
registerManualExecutionAborter('execution-1', abort)
66+
67+
expect(abortManualExecution('execution-1')).toBe(true)
68+
expect(abort).toHaveBeenCalledTimes(1)
69+
})
70+
71+
it('returns false when no execution is registered', () => {
72+
expect(abortManualExecution('execution-missing')).toBe(false)
73+
})
74+
75+
it('unregisters executions', () => {
76+
const abort = vi.fn()
77+
78+
registerManualExecutionAborter('execution-1', abort)
79+
unregisterManualExecutionAborter('execution-1')
80+
81+
expect(abortManualExecution('execution-1')).toBe(false)
82+
expect(abort).not.toHaveBeenCalled()
83+
})
84+
})
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
const activeExecutionAborters = new Map<string, () => void>()
2+
3+
export function registerManualExecutionAborter(executionId: string, abort: () => void): void {
4+
activeExecutionAborters.set(executionId, abort)
5+
}
6+
7+
export function unregisterManualExecutionAborter(executionId: string): void {
8+
activeExecutionAborters.delete(executionId)
9+
}
10+
11+
export function abortManualExecution(executionId: string): boolean {
12+
const abort = activeExecutionAborters.get(executionId)
13+
if (!abort) {
14+
return false
15+
}
16+
17+
abort()
18+
return true
19+
}

0 commit comments

Comments
 (0)