Skip to content

Commit d58da97

Browse files
committed
Make background fibers to work with read_only mode. closes gh-56
1 parent afac190 commit d58da97

File tree

4 files changed

+232
-163
lines changed

4 files changed

+232
-163
lines changed

queue/abstract/driver/fifottl.lua

Lines changed: 90 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,95 @@ function tube.create_space(space_name, opts)
8686
return space
8787
end
8888

89+
local delayed_state = { state.DELAYED }
90+
local ttl_states = { state.READY, state.BURIED }
91+
local ttr_state = { state.TAKEN }
92+
93+
local function fifottl_fiber_iteration(self, processed)
94+
local now = time()
95+
local task = nil
96+
local estimated = TIMEOUT_INFINITY
97+
98+
-- delayed tasks
99+
task = self.space.index.watch:min(delayed_state)
100+
if task and task[i_status] == state.DELAYED then
101+
if now >= task[i_next_event] then
102+
task = self.space:update(task[i_id], {
103+
{ '=', i_status, state.READY },
104+
{ '=', i_next_event, task[i_created] + task[i_ttl] }
105+
})
106+
self:on_task_change(task, 'delay')
107+
estimated = 0
108+
processed = processed + 1
109+
else
110+
estimated = tonumber(task[i_next_event] - now) / 1000000
111+
end
112+
end
113+
114+
-- ttl tasks
115+
for _, state in pairs(ttl_states) do
116+
task = self.space.index.watch:min{ state }
117+
if task ~= nil and task[i_status] == state then
118+
if now >= task[i_next_event] then
119+
task = self:delete(task[i_id]):transform(2, 1, state.DONE)
120+
self:on_task_change(task, 'ttl')
121+
estimated = 0
122+
processed = processed + 1
123+
else
124+
local et = tonumber(task[i_next_event] - now) / 1000000
125+
estimated = et < estimated and et or estimated
126+
end
127+
end
128+
end
129+
130+
-- ttr tasks
131+
task = self.space.index.watch:min(ttr_state)
132+
if task and task[i_status] == state.TAKEN then
133+
if now >= task[i_next_event] then
134+
task = self.space:update(task[i_id], {
135+
{ '=', i_status, state.READY },
136+
{ '=', i_next_event, task[i_created] + task[i_ttl] }
137+
})
138+
self:on_task_change(task, 'ttr')
139+
estimated = 0
140+
processed = processed + 1
141+
else
142+
local et = tonumber(task[i_next_event] - now) / 1000000
143+
estimated = et < estimated and et or estimated
144+
end
145+
end
146+
147+
if estimated > 0 or processed > 1000 then
148+
-- free refcounter
149+
estimated = estimated > 0 and estimated or 0
150+
processed = 0
151+
fiber.sleep(estimated)
152+
end
153+
154+
return processed
155+
end
156+
157+
-- watch fiber
158+
local function fifottl_fiber(self)
159+
fiber.name('fifottl')
160+
log.info("Started queue fifottl fiber")
161+
local processed = 0
162+
163+
while true do
164+
if not box.cfg.read_only then
165+
local stat, err = pcall(fifottl_fiber_iteration, self, processed)
166+
if not stat and not err.code == box.error.READONLY then
167+
log.error("error catched: %s", tostring(err))
168+
log.error("exiting fiber '%s'", fiber.name())
169+
return 1
170+
elseif stat then
171+
processed = err
172+
end
173+
else
174+
fiber.sleep(0.1)
175+
end
176+
end
177+
end
89178

90179
-- start tube on space
91180
function tube.new(space, on_task_change, opts)
@@ -106,85 +195,11 @@ function tube.new(space, on_task_change, opts)
106195
opts = opts,
107196
}, { __index = method })
108197

109-
self.fiber = fiber.create(self._fiber, self)
198+
self.fiber = fiber.create(fifottl_fiber, self)
110199

111200
return self
112201
end
113202

114-
-- watch fiber
115-
function method._fiber(self)
116-
fiber.name('fifottl')
117-
log.info("Started queue fifottl fiber")
118-
local estimated
119-
local ttl_statuses = { state.READY, state.BURIED }
120-
local now, task
121-
local processed = 0
122-
123-
while true do
124-
estimated = TIMEOUT_INFINITY
125-
now = time()
126-
127-
-- delayed tasks
128-
task = self.space.index.watch:min{ state.DELAYED }
129-
if task and task[i_status] == state.DELAYED then
130-
if now >= task[i_next_event] then
131-
task = self.space:update(task[i_id], {
132-
{ '=', i_status, state.READY },
133-
{ '=', i_next_event, task[i_created] + task[i_ttl] }
134-
})
135-
self:on_task_change(task, 'delay')
136-
estimated = 0
137-
processed = processed + 1
138-
else
139-
estimated = tonumber(task[i_next_event] - now) / 1000000
140-
end
141-
end
142-
143-
-- ttl tasks
144-
for _, state in pairs(ttl_statuses) do
145-
task = self.space.index.watch:min{ state }
146-
if task ~= nil and task[i_status] == state then
147-
if now >= task[i_next_event] then
148-
task = self:delete(task[i_id]):transform(2, 1, state.DONE)
149-
self:on_task_change(task, 'ttl')
150-
estimated = 0
151-
processed = processed + 1
152-
else
153-
local et = tonumber(task[i_next_event] - now) / 1000000
154-
estimated = et < estimated and et or estimated
155-
end
156-
end
157-
end
158-
159-
-- ttr tasks
160-
task = self.space.index.watch:min{ state.TAKEN }
161-
if task and task[i_status] == state.TAKEN then
162-
if now >= task[i_next_event] then
163-
task = self.space:update(task[i_id], {
164-
{ '=', i_status, state.READY },
165-
{ '=', i_next_event, task[i_created] + task[i_ttl] }
166-
})
167-
self:on_task_change(task, 'ttr')
168-
estimated = 0
169-
processed = processed + 1
170-
else
171-
local et = tonumber(task[i_next_event] - now) / 1000000
172-
estimated = et < estimated and et or estimated
173-
end
174-
end
175-
176-
177-
if estimated > 0 or processed > 1000 then
178-
-- free refcounter
179-
estimated = estimated > 0 and estimated or 0
180-
processed = 0
181-
task = nil
182-
fiber.sleep(estimated)
183-
end
184-
end
185-
end
186-
187-
188203
-- cleanup internal fields in task
189204
function method.normalize_task(self, task)
190205
return task and task:transform(3, 5)

queue/abstract/driver/utubettl.lua

Lines changed: 103 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,96 @@ function tube.create_space(space_name, opts)
9393
return space
9494
end
9595

96+
local delayed_state = { state.DELAYED }
97+
local ttl_states = { state.READY, state.BURIED }
98+
local ttr_state = { state.TAKEN }
99+
100+
local function utubettl_fiber_iteration(self, processed)
101+
local now = time()
102+
local task = nil
103+
local estimated = TIMEOUT_INFINITY
104+
105+
-- delayed tasks
106+
task = self.space.index.watch:min(delayed_state)
107+
if task and task[i_status] == state.DELAYED then
108+
if now >= task[i_next_event] then
109+
task = self.space:update(task[i_id], {
110+
{ '=', i_status, state.READY },
111+
{ '=', i_next_event, task[i_created] + task[i_ttl] }
112+
})
113+
self:on_task_change(task, 'delayed')
114+
estimated = 0
115+
processed = processed + 1
116+
else
117+
estimated = tonumber(task[i_next_event] - now) / 1000000
118+
end
119+
end
120+
121+
-- ttl tasks
122+
for _, state in pairs(ttl_states) do
123+
task = self.space.index.watch:min{ state }
124+
if task ~= nil and task[i_status] == state then
125+
if now >= task[i_next_event] then
126+
task = self:delete(task[i_id]):transform(2, 1, state.DONE)
127+
self:on_task_change(task, 'ttl')
128+
estimated = 0
129+
processed = processed + 1
130+
else
131+
local et = tonumber(task[i_next_event] - now) / 1000000
132+
estimated = et < estimated and et or estimated
133+
end
134+
end
135+
end
136+
137+
-- ttr tasks
138+
task = self.space.index.watch:min(ttr_state)
139+
if task and task[i_status] == state.TAKEN then
140+
if now >= task[i_next_event] then
141+
task = self.space:update(task[i_id], {
142+
{ '=', i_status, state.READY },
143+
{ '=', i_next_event, task[i_created] + task[i_ttl] }
144+
})
145+
self:on_task_change(task, 'ttr')
146+
estimated = 0
147+
processed = processed + 1
148+
else
149+
local et = tonumber(task[i_next_event] - now) / 1000000
150+
estimated = et < estimated and et or estimated
151+
end
152+
end
153+
154+
if estimated > 0 or processed > 1000 then
155+
-- free refcounter
156+
estimated = processed > 1000 and 0 or estimated
157+
estimated = estimated > 0 and estimated or 0
158+
fiber.sleep(estimated)
159+
end
160+
161+
return processed
162+
end
163+
164+
-- watch fiber
165+
local function utubettl_fiber(self)
166+
fiber.name('utubettl')
167+
log.info("Started queue utubettl fiber")
168+
local processed = 0
169+
170+
while true do
171+
if not box.cfg.read_only then
172+
local stat, err = pcall(utubettl_fiber_iteration, self, processed)
173+
if not stat and not err.code == box.error.READONLY then
174+
log.error("error catched: %s", tostring(err))
175+
log.error("exiting fiber '%s'", fiber.name())
176+
return 1
177+
elseif stat then
178+
processed = err
179+
end
180+
else
181+
fiber.sleep(0.1)
182+
end
183+
end
184+
end
185+
96186
-- start tube on space
97187
function tube.new(space, on_task_change, opts)
98188
on_task_change = on_task_change or (function() end)
@@ -110,97 +200,13 @@ function tube.new(space, on_task_change, opts)
110200
on_task_change(task, stat_data)
111201
end,
112202
opts = opts,
113-
}, { __index = method})
203+
}, { __index = method })
114204

115-
self.fiber = fiber.create(self._fiber, self)
205+
self.fiber = fiber.create(utubettl_fiber, self)
116206

117207
return self
118208
end
119209

120-
local function process_neighbour(self, task, operation)
121-
self:on_task_change(task, operation)
122-
if task ~= nil then
123-
local neighbour = self.space.index.utube:min{state.READY, task[i_utube]}
124-
if neighbour ~= nil and neighbour[i_status] == state.READY then
125-
self:on_task_change(neighbour)
126-
end
127-
end
128-
return task
129-
end
130-
131-
-- watch fiber
132-
function method._fiber(self)
133-
fiber.name('fifottl')
134-
log.info("Started queue utubettl fiber")
135-
local estimated
136-
local ttl_statuses = { state.READY, state.BURIED }
137-
local now, task
138-
local processed = 0
139-
140-
while true do
141-
estimated = TIMEOUT_INFINITY
142-
now = time()
143-
144-
-- delayed tasks
145-
task = self.space.index.watch:min{ state.DELAYED }
146-
if task and task[i_status] == state.DELAYED then
147-
if now >= task[i_next_event] then
148-
task = self.space:update(task[i_id], {
149-
{ '=', i_status, state.READY },
150-
{ '=', i_next_event, task[i_created] + task[i_ttl] }
151-
})
152-
self:on_task_change(task, 'delayed')
153-
estimated = 0
154-
processed = processed + 1
155-
else
156-
estimated = tonumber(task[i_next_event] - now) / 1000000
157-
end
158-
end
159-
160-
-- ttl tasks
161-
for _, state in pairs(ttl_statuses) do
162-
task = self.space.index.watch:min{ state }
163-
if task ~= nil and task[i_status] == state then
164-
if now >= task[i_next_event] then
165-
task = self:delete(task[i_id]):transform(2, 1, state.DONE)
166-
self:on_task_change(task, 'ttl')
167-
estimated = 0
168-
processed = processed + 1
169-
else
170-
local et = tonumber(task[i_next_event] - now) / 1000000
171-
estimated = et < estimated and et or estimated
172-
end
173-
end
174-
end
175-
176-
-- ttr tasks
177-
task = self.space.index.watch:min{ state.TAKEN }
178-
if task and task[i_status] == state.TAKEN then
179-
if now >= task[i_next_event] then
180-
task = self.space:update(task[i_id], {
181-
{ '=', i_status, state.READY },
182-
{ '=', i_next_event, task[i_created] + task[i_ttl] }
183-
})
184-
self:on_task_change(task, 'ttr')
185-
estimated = 0
186-
processed = processed + 1
187-
else
188-
local et = tonumber(task[i_next_event] - now) / 1000000
189-
estimated = et < estimated and et or estimated
190-
end
191-
end
192-
193-
if estimated > 0 or processed > 1000 then
194-
-- free refcounter
195-
estimated = processed > 1000 and 0 or estimated
196-
estimated = estimated > 0 and estimated or 0
197-
processed = 0
198-
task = nil
199-
fiber.sleep(estimated)
200-
end
201-
end
202-
end
203-
204210
-- cleanup internal fields in task
205211
function method.normalize_task(self, task)
206212
return task and task:transform(i_next_event, i_data - i_next_event)
@@ -284,6 +290,17 @@ function method.take(self)
284290
end
285291
end
286292

293+
local function process_neighbour(self, task, operation)
294+
self:on_task_change(task, operation)
295+
if task ~= nil then
296+
local neighbour = self.space.index.utube:min{state.READY, task[i_utube]}
297+
if neighbour ~= nil and neighbour[i_status] == state.READY then
298+
self:on_task_change(neighbour)
299+
end
300+
end
301+
return task
302+
end
303+
287304
-- delete task
288305
function method.delete(self, id)
289306
local task = self.space:delete(id)

0 commit comments

Comments
 (0)