Skip to content

Commit 1e4b3e0

Browse files
MaxwellCalkinclaude
andcommitted
fix: resolve pause-resume race condition with transaction + retry (#3081)
When a resume request arrives before persistPauseResult finishes committing the paused execution record, enqueueOrStartResume fails with "Paused execution not found". This is critical for high-throughput automation where external systems resume milliseconds after pause. Two complementary fixes: 1. Wrap persistPauseResult's DB insert + queue processing in a single transaction so the pause record and any pending resume claims are atomically visible. The SELECT FOR UPDATE in enqueueOrStartResume will block on the row lock until this transaction commits. 2. Add exponential backoff retry (5 attempts, 50ms base delay) in enqueueOrStartResume for the specific "not found" error, handling the window where the row doesn't exist yet and SELECT FOR UPDATE can't acquire a lock on a non-existent row. Only the race-condition-specific error is retried; other errors (already resumed, snapshot not ready, etc.) fail immediately. Closes #3081 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8c0a2e0 commit 1e4b3e0

File tree

2 files changed

+405
-24
lines changed

2 files changed

+405
-24
lines changed
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/**
2+
* @vitest-environment node
3+
*
4+
* Tests for the pause-resume race condition fix in PauseResumeManager.
5+
* Verifies that enqueueOrStartResume retries with exponential backoff
6+
* when the paused execution record has not yet been persisted.
7+
*/
8+
import { loggerMock } from '@sim/testing'
9+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
10+
11+
vi.mock('@sim/logger', () => loggerMock)
12+
13+
vi.mock('@sim/db', () => ({
14+
db: {
15+
transaction: vi.fn(),
16+
insert: vi.fn(),
17+
select: vi.fn(),
18+
update: vi.fn(),
19+
},
20+
}))
21+
22+
vi.mock('@sim/db/schema', () => ({
23+
pausedExecutions: {
24+
executionId: 'executionId',
25+
id: 'id',
26+
},
27+
resumeQueue: {
28+
id: 'id',
29+
parentExecutionId: 'parentExecutionId',
30+
status: 'status',
31+
queuedAt: 'queuedAt',
32+
},
33+
workflowExecutionLogs: {},
34+
}))
35+
36+
vi.mock('drizzle-orm', () => ({
37+
eq: vi.fn((...args: unknown[]) => args),
38+
and: vi.fn((...args: unknown[]) => args),
39+
asc: vi.fn((col: unknown) => col),
40+
desc: vi.fn((col: unknown) => col),
41+
inArray: vi.fn((...args: unknown[]) => args),
42+
lt: vi.fn((...args: unknown[]) => args),
43+
sql: Object.assign(vi.fn(), { raw: vi.fn() }),
44+
}))
45+
46+
vi.mock('@/lib/core/execution-limits', () => ({
47+
createTimeoutAbortController: vi.fn(),
48+
getTimeoutErrorMessage: vi.fn(),
49+
}))
50+
51+
vi.mock('@/lib/execution/preprocessing', () => ({
52+
preprocessExecution: vi.fn(),
53+
}))
54+
55+
vi.mock('@/lib/logs/execution/logging-session', () => ({
56+
LoggingSession: vi.fn(),
57+
}))
58+
59+
vi.mock('@/lib/workflows/executor/execution-core', () => ({
60+
executeWorkflowCore: vi.fn(),
61+
}))
62+
63+
vi.mock('@/executor/execution/snapshot', () => ({
64+
ExecutionSnapshot: vi.fn(),
65+
}))
66+
67+
vi.mock('@/executor/utils/output-filter', () => ({
68+
filterOutputForLog: vi.fn(),
69+
}))
70+
71+
import { db } from '@sim/db'
72+
import { PauseResumeManager } from './human-in-the-loop-manager'
73+
74+
describe('PauseResumeManager', () => {
75+
beforeEach(() => {
76+
vi.clearAllMocks()
77+
vi.useFakeTimers()
78+
})
79+
80+
afterEach(() => {
81+
vi.useRealTimers()
82+
})
83+
84+
/**
85+
* Creates a mock transaction object that simulates Drizzle query chains.
86+
* The pausedExecution lookup uses: select().from().where().for('update').limit(1).then()
87+
* The activeResume lookup uses: select({id}).from().where().limit(1).then()
88+
*/
89+
function createMockTx(pausedExecution: Record<string, unknown> | null) {
90+
// Build a reusable terminal chain that resolves to []
91+
const emptyTerminal = () => ({
92+
limit: vi.fn().mockReturnValue({
93+
then: vi
94+
.fn()
95+
.mockImplementation((resolve: (rows: unknown[]) => unknown) => resolve([])),
96+
}),
97+
then: vi
98+
.fn()
99+
.mockImplementation((resolve: (rows: unknown[]) => unknown) => resolve([])),
100+
})
101+
102+
// The first select() call is the pausedExecution lookup (with .for('update'))
103+
// The second select() call is the activeResume check (no .for())
104+
let selectCallCount = 0
105+
106+
return {
107+
select: vi.fn().mockImplementation(() => {
108+
selectCallCount++
109+
const isFirstSelect = selectCallCount === 1
110+
111+
return {
112+
from: vi.fn().mockReturnValue({
113+
where: vi.fn().mockReturnValue({
114+
// .for('update') path — pausedExecution lookup
115+
for: vi.fn().mockReturnValue({
116+
limit: vi.fn().mockReturnValue({
117+
then: vi.fn().mockImplementation(
118+
(resolve: (rows: unknown[]) => unknown) =>
119+
resolve(isFirstSelect && pausedExecution ? [pausedExecution] : [])
120+
),
121+
}),
122+
}),
123+
// .limit() path (no .for()) — activeResume lookup
124+
limit: vi.fn().mockReturnValue({
125+
then: vi.fn().mockImplementation(
126+
(resolve: (rows: unknown[]) => unknown) => resolve([])
127+
),
128+
}),
129+
// Direct .then() path
130+
then: vi.fn().mockImplementation(
131+
(resolve: (rows: unknown[]) => unknown) => resolve([])
132+
),
133+
}),
134+
}),
135+
}
136+
}),
137+
insert: vi.fn().mockReturnValue({
138+
values: vi.fn().mockReturnValue({
139+
returning: vi.fn().mockResolvedValue([{ id: 'rq-1', queuedAt: new Date() }]),
140+
}),
141+
}),
142+
update: vi.fn().mockReturnValue({
143+
set: vi.fn().mockReturnValue({
144+
where: vi.fn().mockResolvedValue(undefined),
145+
}),
146+
}),
147+
}
148+
}
149+
150+
function createValidPausedExecution() {
151+
return {
152+
id: 'pe-1',
153+
executionId: 'exec-1',
154+
workflowId: 'wf-1',
155+
pausePoints: {
156+
'ctx-1': {
157+
contextId: 'ctx-1',
158+
blockId: 'block-1',
159+
resumeStatus: 'paused',
160+
snapshotReady: true,
161+
},
162+
},
163+
}
164+
}
165+
166+
describe('enqueueOrStartResume - retry on race condition', () => {
167+
it('should retry when paused execution is not found and succeed on later attempt', async () => {
168+
let callCount = 0
169+
const mockedTransaction = vi.mocked(db.transaction)
170+
171+
mockedTransaction.mockImplementation(
172+
async (callback: (tx: unknown) => Promise<unknown>) => {
173+
callCount++
174+
if (callCount <= 2) {
175+
return callback(createMockTx(null))
176+
}
177+
return callback(createMockTx(createValidPausedExecution()))
178+
}
179+
)
180+
181+
const resultPromise = PauseResumeManager.enqueueOrStartResume({
182+
executionId: 'exec-1',
183+
contextId: 'ctx-1',
184+
resumeInput: { value: 'test' },
185+
userId: 'user-1',
186+
})
187+
188+
// Advance timers for retry delays (50ms, 100ms)
189+
await vi.advanceTimersByTimeAsync(50)
190+
await vi.advanceTimersByTimeAsync(100)
191+
192+
const result = await resultPromise
193+
194+
// Should have retried: 2 failures + 1 success = 3 calls
195+
expect(callCount).toBe(3)
196+
expect(result.status).toBe('starting')
197+
expect(result.resumeExecutionId).toBe('exec-1')
198+
})
199+
200+
it('should throw after exhausting all retry attempts', async () => {
201+
const mockedTransaction = vi.mocked(db.transaction)
202+
203+
// All attempts fail — pause record never appears
204+
mockedTransaction.mockImplementation(
205+
async (callback: (tx: unknown) => Promise<unknown>) => {
206+
return callback(createMockTx(null))
207+
}
208+
)
209+
210+
// Capture the rejection to prevent unhandled rejection warnings
211+
let caughtError: Error | undefined
212+
const resultPromise = PauseResumeManager.enqueueOrStartResume({
213+
executionId: 'exec-1',
214+
contextId: 'ctx-1',
215+
resumeInput: {},
216+
userId: 'user-1',
217+
}).catch((err: Error) => {
218+
caughtError = err
219+
})
220+
221+
// Advance timers for all retry delays: 50, 100, 200, 400ms
222+
await vi.advanceTimersByTimeAsync(800)
223+
await resultPromise
224+
225+
expect(caughtError).toBeDefined()
226+
expect(caughtError!.message).toBe('Paused execution not found or already resumed')
227+
})
228+
229+
it('should not retry for non-race-condition errors', async () => {
230+
let callCount = 0
231+
const mockedTransaction = vi.mocked(db.transaction)
232+
233+
const alreadyResumedExecution = {
234+
...createValidPausedExecution(),
235+
pausePoints: {
236+
'ctx-1': {
237+
contextId: 'ctx-1',
238+
blockId: 'block-1',
239+
resumeStatus: 'resumed', // Already resumed
240+
snapshotReady: true,
241+
},
242+
},
243+
}
244+
245+
mockedTransaction.mockImplementation(
246+
async (callback: (tx: unknown) => Promise<unknown>) => {
247+
callCount++
248+
return callback(createMockTx(alreadyResumedExecution))
249+
}
250+
)
251+
252+
await expect(
253+
PauseResumeManager.enqueueOrStartResume({
254+
executionId: 'exec-1',
255+
contextId: 'ctx-1',
256+
resumeInput: {},
257+
userId: 'user-1',
258+
})
259+
).rejects.toThrow('Pause point already resumed or in progress')
260+
261+
// Should NOT retry — this is a different error
262+
expect(callCount).toBe(1)
263+
})
264+
265+
it('should succeed immediately when paused execution exists on first try', async () => {
266+
let callCount = 0
267+
const mockedTransaction = vi.mocked(db.transaction)
268+
269+
mockedTransaction.mockImplementation(
270+
async (callback: (tx: unknown) => Promise<unknown>) => {
271+
callCount++
272+
return callback(createMockTx(createValidPausedExecution()))
273+
}
274+
)
275+
276+
const result = await PauseResumeManager.enqueueOrStartResume({
277+
executionId: 'exec-1',
278+
contextId: 'ctx-1',
279+
resumeInput: { value: 'test' },
280+
userId: 'user-1',
281+
})
282+
283+
// No retries needed
284+
expect(callCount).toBe(1)
285+
expect(result.status).toBe('starting')
286+
})
287+
})
288+
})

0 commit comments

Comments
 (0)