Skip to content

Commit 078784d

Browse files
committed
Use fiber.cond instead of fiber.wakeup, if possible
closes gh-52
1 parent 94cd455 commit 078784d

File tree

4 files changed

+93
-34
lines changed

4 files changed

+93
-34
lines changed

queue/abstract.lua

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ local fun = require('fun')
33
local fiber = require('fiber')
44

55
local state = require('queue.abstract.state')
6-
local num_type = require('queue.compat').num_type
7-
local str_type = require('queue.compat').str_type
6+
7+
local qc = require('queue.compat')
8+
local num_type = qc.num_type
9+
local str_type = qc.str_type
810

911
local session = box.session
1012

@@ -70,6 +72,8 @@ function tube.put(self, data, opts)
7072
return self.raw:normalize_task(task)
7173
end
7274

75+
local conds = {}
76+
7377
function tube.take(self, timeout)
7478
timeout = time(timeout or TIMEOUT_INFINITY)
7579
local task = self.raw:take()
@@ -80,12 +84,15 @@ function tube.take(self, timeout)
8084
while timeout > 0 do
8185
local started = fiber.time64()
8286
local time = event_time(timeout)
83-
local tube_id = self.tube_id
87+
local tid = self.tube_id
88+
local fid = fiber.id()
89+
local sid = session.id()
8490

85-
box.space._queue_consumers:insert{
86-
session.id(), fiber.id(), tube_id, time, fiber.time64() }
87-
fiber.sleep(tonumber(timeout) / 1000000)
88-
box.space._queue_consumers:delete{ session.id(), fiber.id() }
91+
box.space._queue_consumers:insert{sid, fid, tid, time, started}
92+
conds[fid] = qc.waiter()
93+
conds[fid]:wait(tonumber(timeout) / 1000000)
94+
conds[fid]:free()
95+
box.space._queue_consumers:delete{ sid, fid }
8996

9097
task = self.raw:take()
9198

@@ -295,8 +302,11 @@ local function make_self(driver, space, tube_name, tube_type, tube_id, opts)
295302

296303
if consumer ~= nil then
297304
if consumer[3] == tube_id then
298-
fiber.find(consumer[2]):wakeup()
299305
queue_consumers:delete{consumer[1], consumer[2]}
306+
local cond = conds[consumer[2]]
307+
if cond then
308+
cond:signal(consumer[2])
309+
end
300310
end
301311
end
302312
-- task swicthed to taken - registry in taken space
@@ -340,9 +350,9 @@ function method._on_consumer_disconnect()
340350
break
341351
end
342352
box.space._queue_consumers:delete{ waiter[1], waiter[2] }
343-
fb = fiber.find(waiter[2])
344-
if fb ~= nil and fb:status() ~= 'dead' then
345-
fb:wakeup()
353+
local cond = conds[waiter[2]]
354+
if cond then
355+
cond:signal(waiter[2])
346356
end
347357
end
348358

queue/abstract/driver/fifottl.lua

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ local log = require('log')
22
local fiber = require('fiber')
33
local state = require('queue.abstract.state')
44

5-
local num_type = require('queue.compat').num_type
6-
local str_type = require('queue.compat').str_type
5+
local qc = require('queue.compat')
6+
local num_type = qc.num_type
7+
local str_type = qc.str_type
78

89
local tube = {}
910
local method = {}
@@ -147,8 +148,7 @@ local function fifottl_fiber_iteration(self, processed)
147148
if estimated > 0 or processed > 1000 then
148149
-- free refcounter
149150
estimated = estimated > 0 and estimated or 0
150-
processed = 0
151-
fiber.sleep(estimated)
151+
self.cond:wait(estimated)
152152
end
153153

154154
return processed
@@ -183,18 +183,15 @@ function tube.new(space, on_task_change, opts)
183183
space = space,
184184
on_task_change = function(self, task, stats_data)
185185
-- wakeup fiber
186-
if task ~= nil then
187-
if self.fiber ~= nil then
188-
if self.fiber:id() ~= fiber.id() then
189-
self.fiber:wakeup()
190-
end
191-
end
186+
if task ~= nil and self.fiber ~= nil then
187+
self.cond:signal(self.fiber:id())
192188
end
193189
on_task_change(task, stats_data)
194190
end,
195191
opts = opts,
196192
}, { __index = method })
197193

194+
self.cond = qc.waiter()
198195
self.fiber = fiber.create(fifottl_fiber, self)
199196

200197
return self

queue/abstract/driver/utubettl.lua

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ local log = require('log')
22
local fiber = require('fiber')
33

44
local state = require('queue.abstract.state')
5-
local num_type = require('queue.compat').num_type
6-
local str_type = require('queue.compat').str_type
5+
6+
local qc = require('queue.compat')
7+
local num_type = qc.num_type
8+
local str_type = qc.str_type
79

810
local tube = {}
911
local method = {}
@@ -155,7 +157,7 @@ local function utubettl_fiber_iteration(self, processed)
155157
-- free refcounter
156158
estimated = processed > 1000 and 0 or estimated
157159
estimated = estimated > 0 and estimated or 0
158-
fiber.sleep(estimated)
160+
self.cond:wait(estimated)
159161
end
160162

161163
return processed
@@ -190,18 +192,15 @@ function tube.new(space, on_task_change, opts)
190192
space = space,
191193
on_task_change = function(self, task, stat_data)
192194
-- wakeup fiber
193-
if task ~= nil then
194-
if self.fiber ~= nil then
195-
if self.fiber:id() ~= fiber.id() then
196-
self.fiber:wakeup()
197-
end
198-
end
195+
if task ~= nil and self.fiber ~= nil then
196+
self.cond:signal(self.fiber:id())
199197
end
200198
on_task_change(task, stat_data)
201199
end,
202200
opts = opts,
203201
}, { __index = method })
204202

203+
self.cond = qc.waiter()
205204
self.fiber = fiber.create(utubettl_fiber, self)
206205

207206
return self

queue/compat.lua

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
local fun = require('fun')
2-
local log = require('log')
3-
local json = require('json')
1+
local fun = require('fun')
2+
local log = require('log')
3+
local json = require('json')
4+
local fiber = require('fiber')
45

56
local iter, op = fun.iter, fun.operator
67

7-
function split(self, sep)
8+
local function split(self, sep)
89
local sep, fields = sep or ":", {}
910
local pattern = string.format("([^%s]+)", sep)
1011
self:gsub(pattern, function(c) table.insert(fields, c) end)
@@ -56,6 +57,57 @@ local function pack_args(...)
5657
return check_version({1, 7}) and { ... } or ...
5758
end
5859

60+
local waiter_list = {}
61+
62+
local function waiter_new()
63+
return setmetatable({
64+
cond = fiber.cond()
65+
}, {
66+
__index = {
67+
wait = function(self, timeout)
68+
self.cond:wait(timeout)
69+
end,
70+
signal = function(self, wfiber)
71+
self.cond:signal()
72+
end,
73+
free = function(self)
74+
if #waiter_list < 100 then
75+
table.insert(waiter_list, self)
76+
end
77+
end
78+
}
79+
})
80+
end
81+
82+
local function waiter_old()
83+
return setmetatable({}, {
84+
__index = {
85+
wait = function(self, timeout)
86+
fiber.sleep(timeout)
87+
end,
88+
signal = function(self, fid)
89+
local wfiber = fiber.find(fid)
90+
if wfiber ~= nil and
91+
wfiber:status() ~= 'dead' and
92+
wfiber:id() ~= fiber.id() then
93+
wfiber:wakeup()
94+
end
95+
end,
96+
free = function(self)
97+
if #waiter_list < 100 then
98+
table.insert(waiter_list, self)
99+
end
100+
end
101+
}
102+
})
103+
end
104+
105+
local waiter_actual = check_version({1, 7, 2}) and waiter_new or waiter_old
106+
107+
local function waiter()
108+
return table.remove(waiter_list) or waiter_actual()
109+
end
110+
59111
return {
60112
split_version = split_version,
61113
check_version = check_version,
@@ -65,4 +117,5 @@ return {
65117
snapdir_optname = get_optname_snapdir,
66118
logger_optname = get_optname_logger,
67119
pack_args = pack_args,
120+
waiter = waiter
68121
}

0 commit comments

Comments
 (0)