Skip to content

Commit c9b731d

Browse files
committed
test(adapter): move queue-isolation tests from fake adapter to shared driver suite
1 parent 5ef7889 commit c9b731d

File tree

3 files changed

+72
-70
lines changed

3 files changed

+72
-70
lines changed

tests/_mocks/memory_adapter.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { parse } from '../../src/utils.js'
1313
interface ActiveJob {
1414
job: JobData
1515
acquiredAt: number
16+
queue: string
1617
}
1718

1819
interface DelayedJob {
@@ -119,7 +120,7 @@ export class MemoryAdapter implements Adapter {
119120
}
120121

121122
const acquiredAt = Date.now()
122-
this.#activeJobs.set(job.id, { job, acquiredAt })
123+
this.#activeJobs.set(job.id, { job, acquiredAt, queue })
123124

124125
return { ...job, acquiredAt }
125126
}
@@ -187,6 +188,10 @@ export class MemoryAdapter implements Adapter {
187188
let recovered = 0
188189

189190
for (const [jobId, active] of this.#activeJobs.entries()) {
191+
if (active.queue !== queue) {
192+
continue
193+
}
194+
190195
const isStalled = now - active.acquiredAt > stalledThreshold
191196

192197
if (!isStalled) {
@@ -219,7 +224,7 @@ export class MemoryAdapter implements Adapter {
219224

220225
async getJob(jobId: string, queue: string): Promise<JobRecord | null> {
221226
const active = this.#activeJobs.get(jobId)
222-
if (active) {
227+
if (active && active.queue === queue) {
223228
return { status: 'active', data: active.job }
224229
}
225230

tests/_utils/register_driver_test_suite.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,27 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
146146
assert.equal(record!.data.id, 'job-active')
147147
})
148148

149+
test('getJob should not return active job from another queue', async ({ assert }) => {
150+
const adapter = await options.createAdapter()
151+
adapter.setWorkerId('worker-1')
152+
153+
await adapter.pushOn('queue-a', {
154+
id: 'job-active-other-queue',
155+
name: 'TestJob',
156+
payload: {},
157+
attempts: 0,
158+
})
159+
160+
await adapter.popFrom('queue-a')
161+
162+
const wrongQueueRecord = await adapter.getJob('job-active-other-queue', 'queue-b')
163+
assert.isNull(wrongQueueRecord)
164+
165+
const rightQueueRecord = await adapter.getJob('job-active-other-queue', 'queue-a')
166+
assert.isNotNull(rightQueueRecord)
167+
assert.equal(rightQueueRecord!.status, 'active')
168+
})
169+
149170
test('getJob should return null for non-existent job', async ({ assert }) => {
150171
const adapter = await options.createAdapter()
151172
adapter.setWorkerId('worker-1')
@@ -592,6 +613,50 @@ export function registerDriverTestSuite(options: DriverTestSuiteOptions) {
592613
assert.isNull(job3)
593614
})
594615

616+
test('recoverStalledJobs should only recover jobs from the targeted queue', async ({ assert }) => {
617+
const adapter = await options.createAdapter()
618+
adapter.setWorkerId('worker-1')
619+
620+
await adapter.pushOn('queue-a', {
621+
id: 'job-stalled-a',
622+
name: 'TestJob',
623+
payload: null,
624+
attempts: 0,
625+
})
626+
await adapter.pushOn('queue-b', {
627+
id: 'job-stalled-b',
628+
name: 'TestJob',
629+
payload: null,
630+
attempts: 0,
631+
})
632+
633+
const jobA = await adapter.popFrom('queue-a')
634+
const jobB = await adapter.popFrom('queue-b')
635+
assert.isNotNull(jobA)
636+
assert.isNotNull(jobB)
637+
638+
await new Promise((resolve) => setTimeout(resolve, 50))
639+
640+
const recoveredA = await adapter.recoverStalledJobs('queue-a', 10, 1)
641+
assert.equal(recoveredA, 1)
642+
643+
const recoveredJobA = await adapter.popFrom('queue-a')
644+
assert.isNotNull(recoveredJobA)
645+
assert.equal(recoveredJobA!.id, 'job-stalled-a')
646+
647+
const queueBPending = await adapter.popFrom('queue-b')
648+
assert.isNull(queueBPending)
649+
650+
await new Promise((resolve) => setTimeout(resolve, 50))
651+
652+
const recoveredB = await adapter.recoverStalledJobs('queue-b', 10, 1)
653+
assert.equal(recoveredB, 1)
654+
655+
const recoveredJobB = await adapter.popFrom('queue-b')
656+
assert.isNotNull(recoveredJobB)
657+
assert.equal(recoveredJobB!.id, 'job-stalled-b')
658+
})
659+
595660
test('completeJob with undefined retention should remove job (default behavior)', async ({
596661
assert,
597662
}) => {

tests/fake_adapter.spec.ts

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { setTimeout } from 'node:timers/promises'
21
import { test } from '@japa/runner'
32
import { Job } from '../src/job.js'
43
import { fake } from '../src/drivers/fake_adapter.js'
@@ -126,71 +125,4 @@ test.group('FakeAdapter', () => {
126125
await adapter.destroy()
127126
})
128127

129-
test('should recover stalled jobs only for targeted queue', async ({ assert }) => {
130-
const adapter = fake()()
131-
132-
await adapter.pushOn('queue-a', {
133-
id: 'job-a',
134-
name: 'JobA',
135-
payload: null,
136-
attempts: 0,
137-
})
138-
139-
await adapter.pushOn('queue-b', {
140-
id: 'job-b',
141-
name: 'JobB',
142-
payload: null,
143-
attempts: 0,
144-
})
145-
146-
const jobA = await adapter.popFrom('queue-a')
147-
const jobB = await adapter.popFrom('queue-b')
148-
149-
assert.isNotNull(jobA)
150-
assert.isNotNull(jobB)
151-
152-
await setTimeout(2)
153-
154-
const recoveredA = await adapter.recoverStalledJobs('queue-a', 0, 1)
155-
assert.equal(recoveredA, 1)
156-
157-
const recoveredJobA = await adapter.popFrom('queue-a')
158-
assert.equal(recoveredJobA?.id, 'job-a')
159-
160-
const noneInB = await adapter.popFrom('queue-b')
161-
assert.isNull(noneInB)
162-
163-
await setTimeout(2)
164-
165-
const recoveredB = await adapter.recoverStalledJobs('queue-b', 0, 1)
166-
assert.equal(recoveredB, 1)
167-
168-
const recoveredJobB = await adapter.popFrom('queue-b')
169-
assert.equal(recoveredJobB?.id, 'job-b')
170-
171-
await adapter.destroy()
172-
})
173-
174-
test('should ignore active jobs from other queues in getJob', async ({ assert }) => {
175-
const adapter = fake()()
176-
177-
await adapter.pushOn('queue-a', {
178-
id: 'job-a',
179-
name: 'JobA',
180-
payload: null,
181-
attempts: 0,
182-
})
183-
184-
const jobA = await adapter.popFrom('queue-a')
185-
assert.isNotNull(jobA)
186-
187-
const wrongQueue = await adapter.getJob('job-a', 'queue-b')
188-
assert.isNull(wrongQueue)
189-
190-
const correctQueue = await adapter.getJob('job-a', 'queue-a')
191-
assert.isNotNull(correctQueue)
192-
assert.equal(correctQueue?.status, 'active')
193-
194-
await adapter.destroy()
195-
})
196128
})

0 commit comments

Comments
 (0)