Skip to content

Commit c41fc8b

Browse files
committed
feat(adapter): add knex adapter
1 parent 8cd76b8 commit c41fc8b

File tree

6 files changed

+913
-23
lines changed

6 files changed

+913
-23
lines changed

compose.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,13 @@ services:
22
redis:
33
image: redis:alpine
44
ports:
5-
- "6379:6379"
5+
- "6379:6379"
6+
7+
postgres:
8+
image: postgres:alpine
9+
ports:
10+
- "5432:5432"
11+
environment:
12+
POSTGRES_USER: postgres
13+
POSTGRES_PASSWORD: postgres
14+
POSTGRES_DB: queue_test

examples/config.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,29 @@
1-
import type { QueueManagerConfig } from '#types/main'
2-
import { Redis } from 'ioredis'
31
import { redis } from '#drivers/redis_adapter'
42
import { sync } from '#drivers/sync_adapter'
5-
6-
export const redisConnection = new Redis({
7-
host: 'localhost',
8-
port: 6379,
9-
keyPrefix: 'boringnode::queue::',
10-
db: 0,
11-
})
3+
import { knex } from '#drivers/knex_adapter'
4+
import type { QueueManagerConfig } from '#types/main'
125

136
export const config: QueueManagerConfig = {
14-
default: 'redis',
7+
default: 'knex',
158

169
adapters: {
1710
sync: sync(),
18-
redis: redis(redisConnection),
11+
redis: redis({
12+
host: 'localhost',
13+
port: 6379,
14+
keyPrefix: 'boringnode::queue::',
15+
db: 0,
16+
}),
17+
knex: knex({
18+
client: 'pg',
19+
connection: {
20+
host: process.env.PG_HOST || 'localhost',
21+
port: Number.parseInt(process.env.PG_PORT || '5432', 10),
22+
user: process.env.PG_USER || 'postgres',
23+
password: process.env.PG_PASSWORD || 'postgres',
24+
database: process.env.PG_DATABASE || 'queue_test',
25+
},
26+
}),
1927
},
2028

2129
worker: {

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,17 @@
4646
"@japa/expect-type": "^2.0.3",
4747
"@japa/file-system": "^2.3.2",
4848
"@japa/runner": "^4.4.0",
49+
"@types/better-sqlite3": "^7.6.13",
4950
"@types/node": "^24.3.1",
51+
"@types/pg": "^8",
52+
"better-sqlite3": "^12.5.0",
5053
"bullmq": "^5.65.1",
5154
"c8": "^10.1.3",
5255
"del-cli": "^7.0.0",
5356
"eslint": "^9.35.0",
5457
"ioredis": "^5.7.0",
58+
"knex": "^3.1.0",
59+
"pg": "^8.16.3",
5560
"prettier": "^3.6.2",
5661
"release-it": "^19.0.4",
5762
"testcontainers": "^11.5.1",

src/drivers/knex_adapter.ts

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
import KnexPkg from 'knex'
2+
import type { Knex } from 'knex'
3+
import type { Adapter, AcquiredJob } from '#contracts/adapter'
4+
import type { JobData } from '#types/main'
5+
6+
export interface KnexAdapterOptions {
7+
connection: Knex
8+
tableName?: string
9+
ownsConnection?: boolean
10+
}
11+
12+
type KnexConfig = Knex | Knex.Config
13+
14+
/**
15+
* Create a new Knex adapter factory.
16+
* Accepts either a Knex instance or a Knex configuration object.
17+
*
18+
* When passing a config object, the adapter will create and manage
19+
* the connection lifecycle (closing it on destroy).
20+
*
21+
* When passing a Knex instance, the caller is responsible for
22+
* managing the connection lifecycle.
23+
*/
24+
export function knex(config: KnexConfig, tableName?: string) {
25+
return () => {
26+
const isKnexInstance = typeof config === 'function'
27+
const connection = isKnexInstance ? config : KnexPkg(config)
28+
return new KnexAdapter({ connection, tableName, ownsConnection: !isKnexInstance })
29+
}
30+
}
31+
32+
/**
33+
* Knex adapter for the queue system.
34+
* Stores jobs in a SQL database using Knex.
35+
*/
36+
export class KnexAdapter implements Adapter {
37+
readonly #connection: Knex
38+
readonly #tableName: string
39+
readonly #ownsConnection: boolean
40+
#workerId: string = ''
41+
#initialized: boolean = false
42+
43+
constructor(config: KnexAdapterOptions) {
44+
this.#connection = config.connection
45+
this.#tableName = config.tableName ?? 'queue_jobs'
46+
this.#ownsConnection = config.ownsConnection ?? false
47+
}
48+
49+
setWorkerId(workerId: string): void {
50+
this.#workerId = workerId
51+
}
52+
53+
/**
54+
* Ensure the jobs table exists.
55+
* Creates it if not exists, handles race conditions.
56+
*/
57+
async #ensureTableExists(): Promise<void> {
58+
if (this.#initialized) return
59+
60+
try {
61+
await this.#connection.schema.createTable(this.#tableName, (table) => {
62+
table.string('id', 255).notNullable()
63+
table.string('queue', 255).notNullable()
64+
table.enu('status', ['pending', 'active', 'delayed']).notNullable()
65+
table.text('data').notNullable()
66+
table.bigint('score').unsigned().nullable()
67+
table.string('worker_id', 255).nullable()
68+
table.bigint('acquired_at').unsigned().nullable()
69+
table.bigint('execute_at').unsigned().nullable()
70+
table.primary(['id', 'queue'])
71+
table.index(['queue', 'status', 'score'])
72+
table.index(['queue', 'status', 'execute_at'])
73+
})
74+
} catch {
75+
/**
76+
* If table creation fails, verify the table actually exists.
77+
* This handles race conditions where multiple instances try to create
78+
* the table simultaneously.
79+
*/
80+
const hasTable = await this.#connection.schema.hasTable(this.#tableName)
81+
if (!hasTable) {
82+
throw new Error(`Failed to create table "${this.#tableName}"`)
83+
}
84+
}
85+
86+
this.#initialized = true
87+
}
88+
89+
async destroy(): Promise<void> {
90+
if (this.#ownsConnection) {
91+
await this.#connection.destroy()
92+
}
93+
}
94+
95+
async pop(): Promise<AcquiredJob | null> {
96+
return this.popFrom('default')
97+
}
98+
99+
async popFrom(queue: string): Promise<AcquiredJob | null> {
100+
await this.#ensureTableExists()
101+
102+
const now = Date.now()
103+
104+
// First, move ready delayed jobs to pending
105+
await this.#processDelayedJobs(queue, now)
106+
107+
// Use a transaction to atomically pop a job
108+
return this.#connection.transaction(async (trx) => {
109+
// Select the highest priority job (lowest score)
110+
const job = await trx(this.#tableName)
111+
.where('queue', queue)
112+
.where('status', 'pending')
113+
.orderBy('score', 'asc')
114+
.first()
115+
116+
if (!job) {
117+
return null
118+
}
119+
120+
// Update job to active status
121+
await trx(this.#tableName)
122+
.where('id', job.id)
123+
.where('queue', queue)
124+
.update({
125+
status: 'active',
126+
worker_id: this.#workerId,
127+
acquired_at: now,
128+
})
129+
130+
const jobData: JobData = JSON.parse(job.data)
131+
132+
return {
133+
...jobData,
134+
acquiredAt: now,
135+
}
136+
})
137+
}
138+
139+
async #processDelayedJobs(queue: string, now: number): Promise<void> {
140+
// Get all ready delayed jobs
141+
const delayedJobs = await this.#connection(this.#tableName)
142+
.where('queue', queue)
143+
.where('status', 'delayed')
144+
.where('execute_at', '<=', now)
145+
.select('id', 'data')
146+
147+
if (delayedJobs.length === 0) return
148+
149+
// Move them to pending
150+
for (const job of delayedJobs) {
151+
const jobData: JobData = JSON.parse(job.data)
152+
const priority = jobData.priority ?? 5
153+
const score = priority * 1e13 + now
154+
155+
await this.#connection(this.#tableName)
156+
.where('id', job.id)
157+
.where('queue', queue)
158+
.update({
159+
status: 'pending',
160+
score,
161+
execute_at: null,
162+
})
163+
}
164+
}
165+
166+
async completeJob(jobId: string, queue: string): Promise<void> {
167+
await this.#ensureTableExists()
168+
169+
await this.#connection(this.#tableName)
170+
.where('id', jobId)
171+
.where('queue', queue)
172+
.delete()
173+
}
174+
175+
async failJob(jobId: string, queue: string, _error?: Error): Promise<void> {
176+
await this.#ensureTableExists()
177+
178+
await this.#connection(this.#tableName)
179+
.where('id', jobId)
180+
.where('queue', queue)
181+
.delete()
182+
}
183+
184+
async retryJob(jobId: string, queue: string, retryAt?: Date): Promise<void> {
185+
await this.#ensureTableExists()
186+
187+
const now = Date.now()
188+
189+
// Get the active job
190+
const activeJob = await this.#connection(this.#tableName)
191+
.where('id', jobId)
192+
.where('queue', queue)
193+
.where('status', 'active')
194+
.first()
195+
196+
if (!activeJob) return
197+
198+
const jobData: JobData = JSON.parse(activeJob.data)
199+
jobData.attempts = (jobData.attempts || 0) + 1
200+
201+
const updatedData = JSON.stringify(jobData)
202+
203+
if (retryAt && retryAt.getTime() > now) {
204+
// Move to delayed
205+
await this.#connection(this.#tableName)
206+
.where('id', jobId)
207+
.where('queue', queue)
208+
.update({
209+
status: 'delayed',
210+
data: updatedData,
211+
worker_id: null,
212+
acquired_at: null,
213+
score: null,
214+
execute_at: retryAt.getTime(),
215+
})
216+
} else {
217+
// Move back to pending
218+
const priority = jobData.priority ?? 5
219+
const score = priority * 1e13 + now
220+
221+
await this.#connection(this.#tableName)
222+
.where('id', jobId)
223+
.where('queue', queue)
224+
.update({
225+
status: 'pending',
226+
data: updatedData,
227+
worker_id: null,
228+
acquired_at: null,
229+
score,
230+
execute_at: null,
231+
})
232+
}
233+
}
234+
235+
async push(jobData: JobData): Promise<void> {
236+
return this.pushOn('default', jobData)
237+
}
238+
239+
async pushOn(queue: string, jobData: JobData): Promise<void> {
240+
await this.#ensureTableExists()
241+
242+
const priority = jobData.priority ?? 5
243+
const timestamp = Date.now()
244+
const score = priority * 1e13 + timestamp
245+
246+
await this.#connection(this.#tableName).insert({
247+
id: jobData.id,
248+
queue,
249+
status: 'pending',
250+
data: JSON.stringify(jobData),
251+
score,
252+
})
253+
}
254+
255+
async pushLater(jobData: JobData, delay: number): Promise<void> {
256+
return this.pushLaterOn('default', jobData, delay)
257+
}
258+
259+
async pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void> {
260+
await this.#ensureTableExists()
261+
262+
const executeAt = Date.now() + delay
263+
264+
await this.#connection(this.#tableName).insert({
265+
id: jobData.id,
266+
queue,
267+
status: 'delayed',
268+
data: JSON.stringify(jobData),
269+
execute_at: executeAt,
270+
})
271+
}
272+
273+
async size(): Promise<number> {
274+
return this.sizeOf('default')
275+
}
276+
277+
async sizeOf(queue: string): Promise<number> {
278+
await this.#ensureTableExists()
279+
280+
const result = await this.#connection(this.#tableName)
281+
.where('queue', queue)
282+
.where('status', 'pending')
283+
.count('* as count')
284+
.first()
285+
286+
return Number(result?.count ?? 0)
287+
}
288+
}

0 commit comments

Comments
 (0)