Skip to content

Commit 3799fef

Browse files
save fibers in the local table
1 parent 805caa4 commit 3799fef

2 files changed

Lines changed: 82 additions & 86 deletions

File tree

metrics/plugins/graphite.lua

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ local fiber = require('fiber')
33
local metrics = require('metrics')
44
local checks = require('checks')
55
local log = require('log')
6-
local fun = require('fun')
76

87
local graphite = {}
98

@@ -16,6 +15,30 @@ local DEFAULT_SEND_INTERVAL = 2
1615
-- Constants
1716
local LABELS_SEP = ';'
1817

18+
local GRAPHITE_FIBERS = {}
19+
20+
local function create_fiber_table(opts)
21+
local graphite_fiber = {}
22+
23+
if opts ~= nil then
24+
graphite_fiber.prefix = opts.prefix or DEFAULT_PREFIX
25+
graphite_fiber.host = opts.host or DEFAULT_HOST
26+
graphite_fiber.port = opts.port or DEFAULT_PORT
27+
graphite_fiber.send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
28+
graphite_fiber.name = 'metrics_graphite_worker' .. '_' ..
29+
graphite_fiber.prefix .. '_' .. graphite_fiber.host .. '_' ..
30+
graphite_fiber.port .. '_' .. graphite_fiber.send_interval
31+
else
32+
graphite_fiber.prefix = DEFAULT_PREFIX
33+
graphite_fiber.host = DEFAULT_HOST
34+
graphite_fiber.port = DEFAULT_PORT
35+
graphite_fiber.send_interval = DEFAULT_SEND_INTERVAL
36+
graphite_fiber.name = 'metrics_graphite_worker'
37+
end
38+
39+
return graphite_fiber
40+
end
41+
1942
function graphite.format_observation(prefix, obs)
2043
local metric_path = #prefix > 0 and ('%s.%s'):format(prefix, obs.metric_name) or obs.metric_name
2144

@@ -36,41 +59,23 @@ function graphite.format_observation(prefix, obs)
3659
return graph
3760
end
3861

39-
local function create_fiber_name(opts)
40-
local fiber_name
41-
42-
if opts ~= nil then
43-
local prefix = opts.prefix or DEFAULT_PREFIX
44-
local host = opts.host or DEFAULT_HOST
45-
local port = opts.port or DEFAULT_PORT
46-
local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
47-
48-
fiber_name = 'metrics_graphite_worker' .. '_' ..
49-
prefix .. '_' .. host .. '_' .. port .. '_' .. send_interval
50-
else
51-
fiber_name = 'metrics_graphite_worker'
52-
end
53-
54-
return fiber_name
55-
end
56-
57-
local function graphite_worker(opts)
58-
fiber.name(create_fiber_name(opts))
62+
local function graphite_worker(args)
63+
fiber.name(args.name)
5964

6065
while true do
6166
metrics.invoke_callbacks()
6267
for _, c in pairs(metrics.collectors()) do
6368
for _, obs in ipairs(c:collect()) do
64-
local data = graphite.format_observation(opts.prefix, obs)
65-
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
69+
local data = graphite.format_observation(args.prefix, obs)
70+
local numbytes = args.sock:sendto(args.host, args.port, data)
6671
if numbytes == nil then
6772
log.error('Error while sending to host %s port %s data %s',
68-
opts.host, opts.port, data)
73+
args.host, args.port, data)
6974
end
7075
end
7176
end
7277

73-
fiber.sleep(opts.send_interval)
78+
fiber.sleep(args.send_interval)
7479
end
7580
end
7681

@@ -82,25 +87,32 @@ function graphite.init(opts)
8287
send_interval = '?number'
8388
}
8489

90+
local graphite_fiber = create_fiber_table(opts)
91+
8592
local sock = socket('AF_INET', 'SOCK_DGRAM', 'udp')
8693
assert(sock ~= nil, 'Socket creation failed')
8794

88-
local prefix = opts.prefix or DEFAULT_PREFIX
89-
local host = opts.host or DEFAULT_HOST
90-
local port = opts.port or DEFAULT_PORT
91-
local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
92-
93-
fun.iter(fiber.info()):
94-
filter(function(_, x) return x.name == create_fiber_name(opts) end):
95-
each(function(x) fiber.kill(x) end)
95+
-- for _, v in ipairs(GRAPHITE_FIBERS) do
96+
-- if v.name == graphite_fiber.name then
97+
-- error('failed to start fiber: ' .. graphite_fiber.name .. ", already exist")
98+
-- end
99+
-- end
100+
for name in pairs(GRAPHITE_FIBERS) do
101+
if name == graphite_fiber.name then
102+
error('failed to start fiber: ' .. graphite_fiber.name .. ", already exist")
103+
end
104+
end
96105

97-
fiber.create(graphite_worker, {
98-
prefix = prefix,
106+
graphite_fiber.fiber = fiber.create(graphite_worker, {
107+
name = graphite_fiber.name,
108+
prefix = graphite_fiber.prefix,
99109
sock = sock,
100-
host = host,
101-
port = port,
102-
send_interval = send_interval,
110+
host = graphite_fiber.host,
111+
port = graphite_fiber.port,
112+
send_interval = graphite_fiber.send_interval,
103113
})
114+
115+
GRAPHITE_FIBERS[graphite_fiber.name] = graphite_fiber
104116
end
105117

106118
function graphite.stop(opts)
@@ -111,9 +123,17 @@ function graphite.stop(opts)
111123
send_interval = '?number'
112124
}
113125

114-
fun.iter(fiber.info()):
115-
filter(function(_, x) return x.name == create_fiber_name(opts) end):
116-
each(function(x) fiber.kill(x) end)
126+
local graphite_fiber = create_fiber_table(opts)
127+
128+
for name in pairs(GRAPHITE_FIBERS) do
129+
if name == graphite_fiber.name then
130+
fiber.kill(GRAPHITE_FIBERS[name].fiber)
131+
GRAPHITE_FIBERS[name] = nil
132+
return
133+
end
134+
end
135+
136+
error("failed to stop fiber: " .. graphite_fiber.name .. ", doesn't exist")
117137
end
118138

119139
return graphite

test/plugins/graphite_test.lua

Lines changed: 21 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,8 @@ end)
3636

3737
g.after_each(function(cg)
3838
cg.server:exec(function()
39-
local fiber = require('fiber')
40-
local fun = require('fun')
41-
local metrics = require('metrics')
4239
-- Delete all collectors and global labels
43-
metrics.clear()
44-
45-
fun.iter(fiber.info()):
46-
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
47-
each(function(x) fiber.kill(x) end)
48-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
40+
require('metrics').clear()
4941
end)
5042
end)
5143

@@ -153,46 +145,21 @@ g.test_graphite_sends_data_to_socket = function(cg)
153145

154146
local cnt = metrics.counter('test_cnt', 'test-cnt')
155147
cnt:inc(1)
148+
156149
graphite.init({port = port})
157150
end, {port})
158151

159-
require('fiber').sleep(0.5)
152+
sock:readable(2)
153+
160154
local graphite_obs = sock:recvfrom(50)
161155
local obs_table = graphite_obs:split(' ')
162156
t.assert_equals(obs_table[1], 'tarantool.test_cnt')
163157
t.assert_equals(obs_table[2], '1')
164158
sock:close()
165-
end
166159

167-
g.test_graphite_kills_previous_fibers_on_init = function(cg)
168160
cg.server:exec(function()
169-
local fiber = require('fiber')
170-
local fun = require('fun')
171-
local graphite = require('metrics.plugins.graphite')
172-
173-
local function mock_graphite_worker()
174-
fiber.create(function()
175-
fiber.name('metrics_graphite_worker_tarantool_127.0.0.1_2003_2')
176-
fiber.sleep(math.huge)
177-
end)
178-
end
179-
180-
local function count_workers()
181-
return fun.iter(fiber.info()):
182-
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
183-
length()
184-
end
185-
186-
graphite.stop()
187-
188-
t.assert_equals(count_workers(), 0)
189-
mock_graphite_worker()
190-
mock_graphite_worker()
191-
t.assert_equals(count_workers(), 2)
192-
193-
graphite.init({})
194-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
195-
t.assert_equals(count_workers(), 1)
161+
require('metrics.plugins.graphite').stop({port = 22003})
162+
require('fiber').yield()
196163
end)
197164
end
198165

@@ -214,7 +181,7 @@ g.test_graphite_stop_default_fibers = function(cg)
214181
t.assert_equals(count_workers(), 1)
215182

216183
graphite.stop({})
217-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
184+
fiber.yield()
218185
t.assert_equals(count_workers(), 0)
219186
end)
220187
end
@@ -238,17 +205,26 @@ g.test_graphite_stop_custom_fiber = function(cg)
238205
send_interval = 1,
239206
}
240207

208+
local opts2 = {
209+
prefix = "tarantool",
210+
host = "127.0.0.1",
211+
port = 4444,
212+
send_interval = 1,
213+
}
214+
241215
t.assert_equals(count_workers(), 0)
242216

243217
graphite.init(opts)
218+
graphite.init(opts2)
244219

245-
t.assert_equals(count_workers(), 1)
246-
247-
graphite.stop()
248-
t.assert_equals(count_workers(), 1) -- fiber still works
220+
t.assert_equals(count_workers(), 2)
249221

250222
graphite.stop(opts)
251-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
223+
fiber.yield()
224+
t.assert_equals(count_workers(), 1)
225+
226+
graphite.stop(opts2)
227+
fiber.yield()
252228
t.assert_equals(count_workers(), 0)
253229
end)
254230
end

0 commit comments

Comments
 (0)