Skip to content

Commit 4c97bb4

Browse files
authored
Impl touch method (#27)
1 parent ce82ece commit 4c97bb4

3 files changed

Lines changed: 157 additions & 0 deletions

File tree

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 })
154154
* `timeout` - number of seconds to wait for the task processing
155155
* returns task tuple or table (see retval) and boolean `was_processed` flag
156156

157+
* `space:touch(id, [attr])`
158+
- `id`:
159+
+ `string` | `number` - primary key
160+
+ `tuple` - key will be extracted using index
161+
- `attr`
162+
+ `increment` - the value of ttr and ttl (.runat) increased by increment seconds
163+
157164
### Admin methods
158165

159166
* `space:dig(id, [attr])` - dig out task from buried state

test/touch_test.lua

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
---@diagnostic disable: inject-field
2+
local fiber = require 'fiber'
3+
local clock = require 'clock'
4+
local xqueue = require 'xqueue'
5+
6+
require 'test.setup'
7+
8+
local t = require 'luatest' --[[@as luatest]]
9+
local g = t.group('touch')
10+
11+
local queue
12+
local F
13+
local default_time_to = 0.25
14+
15+
g.before_each(function()
16+
if box.space.queue then
17+
box.space.queue:truncate()
18+
for i = #box.space.queue.index, 0, -1 do
19+
local ind = box.space.queue.index[i]
20+
ind:drop()
21+
end
22+
box.space.queue:drop()
23+
end
24+
25+
queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]]
26+
---@class test.xqueue.delayed.tuple: box.tuple
27+
---@field id number
28+
---@field status string
29+
---@field runat number
30+
---@field payload any
31+
queue:format({
32+
{ name = 'id', type = 'unsigned' },
33+
{ name = 'status', type = 'string' },
34+
{ name = 'runat', type = 'number' },
35+
{ name = 'payload', type = 'any' },
36+
})
37+
38+
F = { id = 1, status = 2, runat = 3, payload = 4 }
39+
40+
queue:create_index('primary', { parts = {'id'} })
41+
queue:create_index('status', { parts = {'status', 'id'} })
42+
queue:create_index('runat', { parts = {'runat', 'id'} })
43+
44+
xqueue.upgrade(queue, {
45+
debug = true,
46+
fields = {
47+
runat = 'runat',
48+
status = 'status',
49+
},
50+
features = {
51+
id = 'time64',
52+
keep = true,
53+
ttr = default_time_to,
54+
ttl = default_time_to,
55+
},
56+
})
57+
end)
58+
59+
function g.test_touch_ttr()
60+
local puted_task = queue:put({payload = { 'x' }}) --[[@as test.xqueue.delayed.tuple]]
61+
t.assert_equals(puted_task.status, 'R', 'queue:put(...) must insert task in R status')
62+
63+
local task = queue:take({timeout=0})
64+
t.assert_equals(task.id, puted_task.id, 'queue not empty')
65+
t.assert_le(task.runat, clock.realtime() + default_time_to, 'taked task must have runat le than now+ttr')
66+
67+
-- honest worker
68+
local begin_at = clock.realtime()
69+
while clock.realtime() <= begin_at + default_time_to*4 do
70+
fiber.sleep(default_time_to - 0.01)
71+
queue:touch(task, {increment=default_time_to})
72+
end
73+
74+
local acked_task = queue:ack(task)
75+
t.assert_equals(acked_task.status, 'D', 'task must have Done status')
76+
end
77+
78+
function g.test_touch_ttl()
79+
local puted_task = queue:put({payload = { 'x' }}) --[[@as test.xqueue.delayed.tuple]]
80+
t.assert_equals(puted_task.status, 'R', 'queue:put(...) must insert task in R status')
81+
82+
local task
83+
local begin_at = clock.realtime()
84+
while clock.realtime() <= begin_at + default_time_to*4 do
85+
fiber.sleep(default_time_to - 0.01)
86+
task = queue:touch(puted_task, {increment=default_time_to})
87+
end
88+
89+
t.assert_equals(task.status, 'R')
90+
t.assert_gt(task.runat, puted_task.runat + default_time_to*4 - 0.01)
91+
fiber.sleep(default_time_to*2)
92+
t.assert_equals(queue:take(), nil, 'task must be removed by ttl')
93+
end

xqueue.lua

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,6 +1679,63 @@ function methods:bury(key, attr)
16791679
xq:putback(t)
16801680
end
16811681

1682+
--[[
1683+
* `space:touch(id, [attr])`
1684+
- `id`:
1685+
+ `string` | `number` - primary key
1686+
+ `tuple` - key will be extracted using index
1687+
- `attr`
1688+
+ `increment` - the value of ttr and ttl (.runat) increased by increment seconds
1689+
]]
1690+
1691+
---@param key table|scalar|box.tuple
1692+
---@param attr? { increment: number? }
1693+
---@return table|box.tuple
1694+
function methods:touch(key, attr)
1695+
local xq = self.xq
1696+
key = xq:getkey(key)
1697+
1698+
attr = attr or {}
1699+
1700+
local increment = 0
1701+
if type(attr.increment) ~= 'number' then
1702+
error("attr.increment must be number", 2)
1703+
end
1704+
if attr.increment < 0 then
1705+
error("attr.increment can't be negative", 2)
1706+
end
1707+
1708+
if attr.increment then
1709+
increment = attr.increment
1710+
end
1711+
1712+
local t = self:get(key)
1713+
if not t then
1714+
error(string.format( "Task {%s} was not found", key ),2)
1715+
end
1716+
1717+
local status = t[ xq.fields.status ]
1718+
if status == 'T' then
1719+
xq:check_owner(key)
1720+
end
1721+
1722+
-- delayed or ttl or default ttl
1723+
if xq.have_runat and (status == 'T') or (status == 'R') then
1724+
xq:atomic(key,function()
1725+
t = self:update({key}, {{ '+', xq.fields.runat, increment}})
1726+
1727+
---@cast t box.tuple
1728+
xq:wakeup(t)
1729+
if xq.have_runat then
1730+
xq.runat_chan:put(true,0)
1731+
end
1732+
log.info("Touch: {%s} run_at +%s seconds from %s/sid=%s/fid=%s", key, attr.increment, box.session.storage.peer, box.session.id(), fiber.id())
1733+
end)
1734+
end
1735+
1736+
return t
1737+
end
1738+
16821739
local function kick_task(self, key, attr)
16831740
local xq = self.xq
16841741
key = xq:getkey(key)

0 commit comments

Comments
 (0)