Skip to content

Commit 5df7bea

Browse files
committed
feat: add .id() method for job deduplication
1 parent c4e7c3f commit 5df7bea

File tree

10 files changed

+444
-7
lines changed

10 files changed

+444
-7
lines changed

README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ npm install @boringnode/queue
2626
- **Priority Queues**: Process high-priority jobs first
2727
- **Bulk Dispatch**: Efficiently dispatch thousands of jobs at once
2828
- **Job Grouping**: Organize related jobs for monitoring
29+
- **Job Deduplication**: Prevent duplicate jobs with custom IDs
2930
- **Retry with Backoff**: Exponential, linear, or fixed backoff strategies
3031
- **Job Timeout**: Fail or retry jobs that exceed a time limit
3132
- **Job History**: Retain completed/failed jobs for debugging
@@ -131,6 +132,37 @@ await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')
131132

132133
The `groupId` is stored with job data and accessible via `job.data.groupId`.
133134

135+
## Job Deduplication
136+
137+
Prevent the same job from being pushed to the queue twice using custom job IDs:
138+
139+
```typescript
140+
// First dispatch - job is created
141+
await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
142+
143+
// Second dispatch with same ID - silently skipped
144+
await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
145+
```
146+
147+
The custom ID is automatically prefixed with the job name, so different job types can use the same ID without conflicts:
148+
149+
```typescript
150+
// These are two different jobs, no conflict
151+
await SendInvoiceJob.dispatch({ orderId: 123 }).id('order-123').run()
152+
await SendReceiptJob.dispatch({ orderId: 123 }).id('order-123').run()
153+
```
154+
155+
Deduplication is atomic and race-condition-free across all adapters:
156+
157+
- **Redis**: Uses `HSETNX` (set-if-not-exists)
158+
- **Knex**: Uses `INSERT ... ON CONFLICT DO NOTHING`
159+
160+
> [!NOTE]
161+
> Without `.id()`, jobs use auto-generated UUIDs and are never deduplicated. The `.id()` method is only available on single dispatch, not `dispatchMany`.
162+
163+
> [!TIP]
164+
> When job retention is enabled (`removeOnComplete: false`), completed jobs remain in storage. A re-dispatch with the same custom ID will be silently skipped since the record still exists. With the default retention (`true`), completed jobs are removed immediately, so re-dispatch with the same ID succeeds normally.
165+
134166
## Job History & Retention
135167

136168
Keep completed and failed jobs for debugging:

src/drivers/fake_adapter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@ export class FakeAdapter implements Adapter {
160160
}
161161

162162
async pushOn(queue: string, jobData: JobData): Promise<void> {
163+
if (jobData.unique) {
164+
const jobs = this.#queues.get(queue)
165+
if (jobs?.some((j) => j.id === jobData.id)) return
166+
}
167+
163168
this.#recordPush(queue, jobData)
164169
this.#enqueue(queue, jobData)
165170
}
@@ -169,6 +174,14 @@ export class FakeAdapter implements Adapter {
169174
}
170175

171176
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
177+
if (jobData.unique) {
178+
const jobs = this.#queues.get(queue)
179+
if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve()
180+
181+
const delayed = this.#delayedJobs.get(queue)
182+
if (delayed?.has(jobData.id)) return Promise.resolve()
183+
}
184+
172185
this.#recordPush(queue, jobData, delay)
173186
this.#schedulePush(queue, jobData, delay)
174187

src/drivers/knex_adapter.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,13 +370,19 @@ export class KnexAdapter implements Adapter {
370370
const timestamp = Date.now()
371371
const score = calculateScore(priority, timestamp)
372372

373-
await this.#connection(this.#jobsTable).insert({
373+
const query = this.#connection(this.#jobsTable).insert({
374374
id: jobData.id,
375375
queue,
376376
status: 'pending',
377377
data: JSON.stringify(jobData),
378378
score,
379379
})
380+
381+
if (jobData.unique) {
382+
await query.onConflict(['id', 'queue']).ignore()
383+
} else {
384+
await query
385+
}
380386
}
381387

382388
async pushLater(jobData: JobData, delay: number): Promise<void> {
@@ -386,13 +392,19 @@ export class KnexAdapter implements Adapter {
386392
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
387393
const executeAt = Date.now() + delay
388394

389-
await this.#connection(this.#jobsTable).insert({
395+
const query = this.#connection(this.#jobsTable).insert({
390396
id: jobData.id,
391397
queue,
392398
status: 'delayed',
393399
data: JSON.stringify(jobData),
394400
execute_at: executeAt,
395401
})
402+
403+
if (jobData.unique) {
404+
await query.onConflict(['id', 'queue']).ignore()
405+
} else {
406+
await query
407+
}
396408
}
397409

398410
async pushMany(jobs: JobData[]): Promise<void> {

src/drivers/redis_adapter.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,26 @@ const PUSH_JOB_SCRIPT = `
3535
return 1
3636
`
3737

38+
/**
39+
* Lua script for pushing a unique job.
40+
* Uses HSETNX to only store data if the job doesn't already exist.
41+
* Only adds to pending ZSET if the job was newly created.
42+
*/
43+
const PUSH_UNIQUE_JOB_SCRIPT = `
44+
local data_key = KEYS[1]
45+
local pending_key = KEYS[2]
46+
local job_id = ARGV[1]
47+
local job_data = ARGV[2]
48+
local score = tonumber(ARGV[3])
49+
50+
local added = redis.call('HSETNX', data_key, job_id, job_data)
51+
if added == 1 then
52+
redis.call('ZADD', pending_key, score, job_id)
53+
end
54+
55+
return added
56+
`
57+
3858
/**
3959
* Lua script for pushing a delayed job.
4060
* Stores job data in the central hash and adds jobId to delayed ZSET.
@@ -52,6 +72,26 @@ const PUSH_DELAYED_JOB_SCRIPT = `
5272
return 1
5373
`
5474

75+
/**
76+
* Lua script for pushing a unique delayed job.
77+
* Uses HSETNX to only store data if the job doesn't already exist.
78+
* Only adds to delayed ZSET if the job was newly created.
79+
*/
80+
const PUSH_UNIQUE_DELAYED_JOB_SCRIPT = `
81+
local data_key = KEYS[1]
82+
local delayed_key = KEYS[2]
83+
local job_id = ARGV[1]
84+
local job_data = ARGV[2]
85+
local execute_at = tonumber(ARGV[3])
86+
87+
local added = redis.call('HSETNX', data_key, job_id, job_data)
88+
if added == 1 then
89+
redis.call('ZADD', delayed_key, execute_at, job_id)
90+
end
91+
92+
return added
93+
`
94+
5595
/**
5696
* Lua script for atomic job acquisition.
5797
* 1. Check and process delayed jobs
@@ -620,8 +660,10 @@ export class RedisAdapter implements Adapter {
620660
const keys = this.#getKeys(queue)
621661
const executeAt = Date.now() + delay
622662

663+
const script = jobData.unique ? PUSH_UNIQUE_DELAYED_JOB_SCRIPT : PUSH_DELAYED_JOB_SCRIPT
664+
623665
await this.#connection.eval(
624-
PUSH_DELAYED_JOB_SCRIPT,
666+
script,
625667
2,
626668
keys.data,
627669
keys.delayed,
@@ -637,8 +679,10 @@ export class RedisAdapter implements Adapter {
637679
const timestamp = Date.now()
638680
const score = calculateScore(priority, timestamp)
639681

682+
const script = jobData.unique ? PUSH_UNIQUE_JOB_SCRIPT : PUSH_JOB_SCRIPT
683+
640684
await this.#connection.eval(
641-
PUSH_JOB_SCRIPT,
685+
script,
642686
2,
643687
keys.data,
644688
keys.pending,

src/job_dispatcher.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export class JobDispatcher<T> {
4747
#delay?: Duration
4848
#priority?: number
4949
#groupId?: string
50+
#id?: string
5051

5152
/**
5253
* Create a new job dispatcher.
@@ -148,6 +149,40 @@ export class JobDispatcher<T> {
148149
return this
149150
}
150151

152+
/**
153+
* Set a custom job ID for deduplication.
154+
*
155+
* When a custom ID is provided, the adapter will silently skip
156+
* the job if one with the same ID already exists in the queue.
157+
* The ID is automatically prefixed with the job name to prevent
158+
* collisions between different job types.
159+
*
160+
* @param jobId - Custom identifier for this job
161+
* @returns This dispatcher for chaining
162+
*
163+
* @example
164+
* ```typescript
165+
* // Prevent duplicate invoice jobs for the same order
166+
* await SendInvoiceJob.dispatch({ orderId: 123 })
167+
* .id('order-123')
168+
* .run()
169+
*
170+
* // Second dispatch with same ID is silently skipped
171+
* await SendInvoiceJob.dispatch({ orderId: 123 })
172+
* .id('order-123')
173+
* .run()
174+
* ```
175+
*/
176+
id(jobId: string): this {
177+
if (!jobId) {
178+
throw new Error('Job ID must be a non-empty string')
179+
}
180+
181+
this.#id = jobId
182+
183+
return this
184+
}
185+
151186
/**
152187
* Use a specific adapter for this job.
153188
*
@@ -181,7 +216,7 @@ export class JobDispatcher<T> {
181216
* ```
182217
*/
183218
async run(): Promise<DispatchResult> {
184-
const id = randomUUID()
219+
const id = this.#id ? `${this.#name}::${this.#id}` : randomUUID()
185220

186221
debug('dispatching job %s with id %s using payload %s', this.#name, id, this.#payload)
187222

@@ -197,6 +232,7 @@ export class JobDispatcher<T> {
197232
priority: this.#priority,
198233
groupId: this.#groupId,
199234
createdAt: Date.now(),
235+
...(this.#id ? { unique: true } : {}),
200236
}
201237

202238
const message: JobDispatchMessage = { jobs: [jobData], queue: this.#queue, delay: parsedDelay }

src/types/main.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ export interface JobData {
133133
* Injected by OTel plugin at dispatch time.
134134
*/
135135
traceContext?: Record<string, string>
136+
137+
/**
138+
* When true, adapters use atomic insert-if-not-exists semantics
139+
* to silently skip duplicate jobs with the same ID.
140+
* Set automatically when a custom job ID is provided via `.id()`.
141+
*/
142+
unique?: boolean
136143
}
137144

138145
/**

tests/_mocks/memory_adapter.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ export class MemoryAdapter implements Adapter {
5151
}
5252

5353
async pushOn(queue: string, jobData: JobData): Promise<void> {
54+
if (jobData.unique) {
55+
const jobs = this.#queues.get(queue)
56+
if (jobs?.some((j) => j.id === jobData.id)) return
57+
}
58+
5459
if (!this.#queues.has(queue)) {
5560
this.#queues.set(queue, [])
5661
}
@@ -63,6 +68,14 @@ export class MemoryAdapter implements Adapter {
6368
}
6469

6570
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
71+
if (jobData.unique) {
72+
const jobs = this.#queues.get(queue)
73+
if (jobs?.some((j) => j.id === jobData.id)) return Promise.resolve()
74+
75+
const delayed = this.#delayedJobs.get(queue)
76+
if (delayed?.has(jobData.id)) return Promise.resolve()
77+
}
78+
6679
if (!this.#delayedJobs.has(queue)) {
6780
this.#delayedJobs.set(queue, new Map())
6881
}

0 commit comments

Comments
 (0)