Skip to content

Commit 422a360

Browse files
committed
fix: check all job states in dedup guard for fake and memory adapters
1 parent 62e9697 commit 422a360

File tree

3 files changed

+12
-17
lines changed

3 files changed

+12
-17
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,11 @@ await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
152152
await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run()
153153
```
154154

155-
Deduplication is atomic and race-condition-free across all adapters:
155+
Deduplication is atomic and race-condition-free for adapters that support storage-level uniqueness checks:
156156

157157
- **Redis**: Uses `HSETNX` (set-if-not-exists)
158158
- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING`
159+
- **SyncAdapter**: Executes jobs inline and does not support deduplication
159160

160161
> [!NOTE]
161162
> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`.

src/drivers/fake_adapter.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,8 @@ export class FakeAdapter implements Adapter {
161161

162162
async pushOn(queue: string, jobData: JobData): Promise<void> {
163163
if (jobData.unique) {
164-
const jobs = this.#queues.get(queue)
165-
if (jobs?.some((j) => j.id === jobData.id)) return
164+
const existing = await this.getJob(jobData.id, queue)
165+
if (existing) return
166166
}
167167

168168
this.#recordPush(queue, jobData)
@@ -173,13 +173,10 @@ export class FakeAdapter implements Adapter {
173173
return this.pushLaterOn('default', jobData, delay)
174174
}
175175

176-
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
176+
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
177177
if (jobData.unique) {
178-
const jobs = this.#queues.get(queue)
179-
if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve()
180-
181-
const delayed = this.#delayedJobs.get(queue)
182-
if (delayed?.has(jobData.id)) return Promise.resolve()
178+
const existing = await this.getJob(jobData.id, queue)
179+
if (existing) return
183180
}
184181

185182
this.#recordPush(queue, jobData, delay)

tests/_mocks/memory_adapter.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ export class MemoryAdapter implements Adapter {
5252

5353
async pushOn(queue: string, jobData: JobData): Promise<void> {
5454
if (jobData.unique) {
55-
const jobs = this.#queues.get(queue)
56-
if (jobs?.some((j) => j.id === jobData.id)) return
55+
const existing = await this.getJob(jobData.id, queue)
56+
if (existing) return
5757
}
5858

5959
if (!this.#queues.has(queue)) {
@@ -67,13 +67,10 @@ export class MemoryAdapter implements Adapter {
6767
return this.pushLaterOn('default', jobData, delay)
6868
}
6969

70-
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
70+
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
7171
if (jobData.unique) {
72-
const jobs = this.#queues.get(queue)
73-
if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve()
74-
75-
const delayed = this.#delayedJobs.get(queue)
76-
if (delayed?.has(jobData.id)) return Promise.resolve()
72+
const existing = await this.getJob(jobData.id, queue)
73+
if (existing) return
7774
}
7875

7976
if (!this.#delayedJobs.has(queue)) {

0 commit comments

Comments
 (0)