Skip to content

Commit f3c85cd

Browse files
committed
perf(redis): update claim schedule script to use sorted set index
1 parent b2c0e84 commit f3c85cd

1 file changed

Lines changed: 81 additions & 59 deletions

File tree

src/drivers/redis_scripts.ts

Lines changed: 81 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js'
1+
import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js';
22

33
/**
44
* Lua script for pushing a job to the queue.
@@ -18,7 +18,7 @@ ${REDIS_JOB_STORAGE_LUA}
1818
redis.call('ZADD', pending_key, score, job_id)
1919
2020
return 1
21-
`
21+
`;
2222

2323
/**
2424
* Lua script for pushing a dedup job.
@@ -80,7 +80,7 @@ ${REDIS_DEDUP_LUA}
8080
redis.call('PEXPIRE', dedup_key, ttl)
8181
end
8282
return {'added', job_id}
83-
`
83+
`;
8484

8585
/**
8686
* Lua script for pushing a delayed job.
@@ -100,7 +100,7 @@ ${REDIS_JOB_STORAGE_LUA}
100100
redis.call('ZADD', delayed_key, execute_at, job_id)
101101
102102
return 1
103-
`
103+
`;
104104

105105
/**
106106
* Lua script for atomic job acquisition.
@@ -158,7 +158,7 @@ ${REDIS_JOB_STORAGE_LUA}
158158
return encode_job_result(job_data, overlay_key, job_id, {
159159
acquiredAt = now
160160
})
161-
`
161+
`;
162162

163163
/**
164164
* Lua script for removing a job completely (no history).
@@ -193,7 +193,7 @@ ${REDIS_JOB_STORAGE_LUA}
193193
delete_job_data(data_key, overlay_key, job_id)
194194
195195
return 1
196-
`
196+
`;
197197

198198
/**
199199
* Lua script for finalizing a job in history.
@@ -277,7 +277,7 @@ ${REDIS_JOB_STORAGE_LUA}
277277
end
278278
279279
return 1
280-
`
280+
`;
281281

282282
/**
283283
* Lua script for retrying a job.
@@ -330,7 +330,7 @@ ${REDIS_JOB_STORAGE_LUA}
330330
end
331331
332332
return 1
333-
`
333+
`;
334334

335335
/**
336336
* Lua script for recovering stalled jobs.
@@ -399,7 +399,7 @@ ${REDIS_JOB_STORAGE_LUA}
399399
end
400400
401401
return recovered
402-
`
402+
`;
403403

404404
/**
405405
* Lua script for getting a job record with its status.
@@ -458,77 +458,99 @@ ${REDIS_JOB_STORAGE_LUA}
458458
finishedAt = finished_at,
459459
error = error_msg
460460
})
461-
`
461+
`;
462462

463463
/**
464-
* Lua script for atomically claiming a due schedule.
465-
* Iterates the schedule index server-side and claims the first due schedule.
466-
* Returns the schedule data if claimed, nil otherwise.
464+
* Lua script for atomically claiming a due schedule using a sorted set index.
465+
*
466+
* Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N)
467+
* lookup instead of scanning all schedule hashes via SMEMBERS.
468+
*
469+
* Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on
470+
* sight so subsequent calls skip them.
471+
*
472+
* KEYS[1] = schedules::due (the ZSET)
473+
* KEYS[2] = schedule key prefix (e.g. "schedules::")
474+
* ARGV[1] = now (epoch milliseconds)
467475
*/
468476
export const CLAIM_SCHEDULE_SCRIPT = `
469-
local schedules_index_key = KEYS[1]
470-
local schedule_key_prefix = KEYS[2]
477+
local due_key = KEYS[1]
478+
local prefix = KEYS[2]
471479
local now = tonumber(ARGV[1])
472480
473-
local ids = redis.call('SMEMBERS', schedules_index_key)
481+
while true do
482+
local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1)
483+
484+
if #candidates == 0 then
485+
return nil
486+
end
474487
475-
for i = 1, #ids do
476-
local schedule_key = schedule_key_prefix .. ids[i]
488+
local id = candidates[1]
489+
local schedule_key = prefix .. id
477490
478491
-- Get schedule data
479492
local data = redis.call('HGETALL', schedule_key)
480-
if #data > 0 then
493+
494+
-- Deleted schedule still in ZSET
495+
if #data == 0 then
496+
redis.call('ZREM', due_key, id)
497+
else
481498
-- Convert HGETALL result to table
482499
local schedule = {}
483500
for j = 1, #data, 2 do
484501
schedule[data[j]] = data[j + 1]
485502
end
486503
487-
-- Check if schedule is due
488-
if schedule.status == 'active' then
489-
local next_run_at = tonumber(schedule.next_run_at)
490-
491-
if next_run_at and next_run_at <= now then
492-
local run_count = tonumber(schedule.run_count or '0')
493-
local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil
494-
local to_date = schedule.to_date and tonumber(schedule.to_date) or nil
495-
496-
-- Check limits
497-
if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then
498-
-- This schedule is claimable - atomically update it
499-
local new_run_count = run_count + 1
500-
501-
-- Calculate new next_run_at (simple interval-based for now)
502-
-- Complex cron calculation happens in the caller
503-
local new_next_run_at = ''
504-
local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil
505-
if every_ms then
506-
new_next_run_at = tostring(now + every_ms)
507-
end
508-
509-
-- Check if we've hit the limit after this run
510-
if run_limit and new_run_count >= run_limit then
511-
new_next_run_at = ''
512-
end
504+
-- Check if schedule is active
505+
if schedule.status ~= 'active' then
506+
redis.call('ZREM', due_key, id)
507+
else
508+
local run_count = tonumber(schedule.run_count or '0')
509+
local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil
510+
local to_date = schedule.to_date and tonumber(schedule.to_date) or nil
511+
512+
-- Check limits
513+
if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then
514+
redis.call('ZREM', due_key, id)
515+
else
516+
-- This schedule is claimable - atomically update it
517+
local new_run_count = run_count + 1
518+
519+
-- Calculate new next_run_at (simple interval-based for now)
520+
-- Complex cron calculation happens in the caller
521+
local new_next_run_at = ''
522+
local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil
523+
if every_ms then
524+
new_next_run_at = tostring(now + every_ms)
525+
end
513526
514-
-- Check if past end date
515-
if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then
516-
new_next_run_at = ''
517-
end
527+
-- Check if we've hit the limit after this run
528+
if run_limit and new_run_count >= run_limit then
529+
new_next_run_at = ''
530+
end
518531
519-
-- Update the schedule atomically
520-
redis.call('HSET', schedule_key,
521-
'next_run_at', new_next_run_at,
522-
'last_run_at', tostring(now),
523-
'run_count', tostring(new_run_count))
532+
-- Check if past end date
533+
if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then
534+
new_next_run_at = ''
535+
end
524536
525-
-- Return the schedule data (before update) as JSON
526-
return cjson.encode(schedule)
537+
-- Update the schedule atomically
538+
redis.call('HSET', schedule_key,
539+
'next_run_at', new_next_run_at,
540+
'last_run_at', tostring(now),
541+
'run_count', tostring(new_run_count))
542+
543+
-- Update or remove from ZSET
544+
if new_next_run_at ~= '' then
545+
redis.call('ZADD', due_key, tonumber(new_next_run_at), id)
546+
else
547+
redis.call('ZREM', due_key, id)
527548
end
549+
550+
-- Return the schedule data (before update) as JSON
551+
return cjson.encode(schedule)
528552
end
529553
end
530554
end
531555
end
532-
533-
return nil
534-
`
556+
`;

0 commit comments

Comments
 (0)