Skip to content

Commit c6c148d

Browse files
save fibers in the local table
1 parent 805caa4 commit c6c148d

2 files changed

Lines changed: 68 additions & 77 deletions

File tree

metrics/plugins/graphite.lua

Lines changed: 55 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ local fiber = require('fiber')
33
local metrics = require('metrics')
44
local checks = require('checks')
55
local log = require('log')
6-
local fun = require('fun')
76

87
local graphite = {}
98

@@ -16,6 +15,30 @@ local DEFAULT_SEND_INTERVAL = 2
1615
-- Constants
1716
local LABELS_SEP = ';'
1817

18+
local GRAPHITE_FIBERS = {}
19+
20+
local function create_fiber_table(opts)
21+
local graphite_fiber = {}
22+
23+
if opts ~= nil then
24+
graphite_fiber.prefix = opts.prefix or DEFAULT_PREFIX
25+
graphite_fiber.host = opts.host or DEFAULT_HOST
26+
graphite_fiber.port = opts.port or DEFAULT_PORT
27+
graphite_fiber.send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
28+
graphite_fiber.name = 'metrics_graphite_worker' .. '_' ..
29+
graphite_fiber.prefix .. '_' .. graphite_fiber.host .. '_' ..
30+
graphite_fiber.port .. '_' .. graphite_fiber.send_interval
31+
else
32+
graphite_fiber.prefix = DEFAULT_PREFIX
33+
graphite_fiber.host = DEFAULT_HOST
34+
graphite_fiber.port = DEFAULT_PORT
35+
graphite_fiber.send_interval = DEFAULT_SEND_INTERVAL
36+
graphite_fiber.name = 'metrics_graphite_worker'
37+
end
38+
39+
return graphite_fiber
40+
end
41+
1942
function graphite.format_observation(prefix, obs)
2043
local metric_path = #prefix > 0 and ('%s.%s'):format(prefix, obs.metric_name) or obs.metric_name
2144

@@ -36,41 +59,23 @@ function graphite.format_observation(prefix, obs)
3659
return graph
3760
end
3861

39-
local function create_fiber_name(opts)
40-
local fiber_name
41-
42-
if opts ~= nil then
43-
local prefix = opts.prefix or DEFAULT_PREFIX
44-
local host = opts.host or DEFAULT_HOST
45-
local port = opts.port or DEFAULT_PORT
46-
local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
47-
48-
fiber_name = 'metrics_graphite_worker' .. '_' ..
49-
prefix .. '_' .. host .. '_' .. port .. '_' .. send_interval
50-
else
51-
fiber_name = 'metrics_graphite_worker'
52-
end
53-
54-
return fiber_name
55-
end
56-
57-
local function graphite_worker(opts)
58-
fiber.name(create_fiber_name(opts))
62+
local function graphite_worker(args)
63+
fiber.name(args.name)
5964

6065
while true do
6166
metrics.invoke_callbacks()
6267
for _, c in pairs(metrics.collectors()) do
6368
for _, obs in ipairs(c:collect()) do
64-
local data = graphite.format_observation(opts.prefix, obs)
65-
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
69+
local data = graphite.format_observation(args.prefix, obs)
70+
local numbytes = args.sock:sendto(args.host, args.port, data)
6671
if numbytes == nil then
6772
log.error('Error while sending to host %s port %s data %s',
68-
opts.host, opts.port, data)
73+
args.host, args.port, data)
6974
end
7075
end
7176
end
7277

73-
fiber.sleep(opts.send_interval)
78+
fiber.sleep(args.send_interval)
7479
end
7580
end
7681

@@ -82,25 +87,27 @@ function graphite.init(opts)
8287
send_interval = '?number'
8388
}
8489

90+
local graphite_fiber = create_fiber_table(opts)
91+
8592
local sock = socket('AF_INET', 'SOCK_DGRAM', 'udp')
8693
assert(sock ~= nil, 'Socket creation failed')
8794

88-
local prefix = opts.prefix or DEFAULT_PREFIX
89-
local host = opts.host or DEFAULT_HOST
90-
local port = opts.port or DEFAULT_PORT
91-
local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
92-
93-
fun.iter(fiber.info()):
94-
filter(function(_, x) return x.name == create_fiber_name(opts) end):
95-
each(function(x) fiber.kill(x) end)
95+
for _, v in ipairs(GRAPHITE_FIBERS) do
96+
if v.name == graphite_fiber.name then
97+
error('failed to start fiber: ' .. graphite_fiber.name .. ", already exist")
98+
end
99+
end
96100

97-
fiber.create(graphite_worker, {
98-
prefix = prefix,
101+
graphite_fiber.fiber = fiber.create(graphite_worker, {
102+
name = graphite_fiber.name,
103+
prefix = graphite_fiber.prefix,
99104
sock = sock,
100-
host = host,
101-
port = port,
102-
send_interval = send_interval,
105+
host = graphite_fiber.host,
106+
port = graphite_fiber.port,
107+
send_interval = graphite_fiber.send_interval,
103108
})
109+
110+
table.insert(GRAPHITE_FIBERS, graphite_fiber)
104111
end
105112

106113
function graphite.stop(opts)
@@ -111,9 +118,16 @@ function graphite.stop(opts)
111118
send_interval = '?number'
112119
}
113120

114-
fun.iter(fiber.info()):
115-
filter(function(_, x) return x.name == create_fiber_name(opts) end):
116-
each(function(x) fiber.kill(x) end)
121+
local graphite_fiber = create_fiber_table(opts)
122+
123+
for _, v in ipairs(GRAPHITE_FIBERS) do
124+
if v.name == graphite_fiber.name then
125+
fiber.kill(v.fiber)
126+
return
127+
end
128+
end
129+
130+
error("failed to stop fiber: " .. graphite_fiber.name .. ", doesn't exist")
117131
end
118132

119133
return graphite

test/plugins/graphite_test.lua

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -164,38 +164,6 @@ g.test_graphite_sends_data_to_socket = function(cg)
164164
sock:close()
165165
end
166166

167-
g.test_graphite_kills_previous_fibers_on_init = function(cg)
168-
cg.server:exec(function()
169-
local fiber = require('fiber')
170-
local fun = require('fun')
171-
local graphite = require('metrics.plugins.graphite')
172-
173-
local function mock_graphite_worker()
174-
fiber.create(function()
175-
fiber.name('metrics_graphite_worker_tarantool_127.0.0.1_2003_2')
176-
fiber.sleep(math.huge)
177-
end)
178-
end
179-
180-
local function count_workers()
181-
return fun.iter(fiber.info()):
182-
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
183-
length()
184-
end
185-
186-
graphite.stop()
187-
188-
t.assert_equals(count_workers(), 0)
189-
mock_graphite_worker()
190-
mock_graphite_worker()
191-
t.assert_equals(count_workers(), 2)
192-
193-
graphite.init({})
194-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
195-
t.assert_equals(count_workers(), 1)
196-
end)
197-
end
198-
199167
g.test_graphite_stop_default_fibers = function(cg)
200168
cg.server:exec(function()
201169
local fiber = require('fiber')
@@ -238,17 +206,26 @@ g.test_graphite_stop_custom_fiber = function(cg)
238206
send_interval = 1,
239207
}
240208

209+
local opts2 = {
210+
prefix = "tarantool",
211+
host = "127.0.0.1",
212+
port = 4444,
213+
send_interval = 1,
214+
}
215+
241216
t.assert_equals(count_workers(), 0)
242217

243218
graphite.init(opts)
219+
graphite.init(opts2)
244220

245-
t.assert_equals(count_workers(), 1)
246-
247-
graphite.stop()
248-
t.assert_equals(count_workers(), 1) -- fiber still works
221+
t.assert_equals(count_workers(), 2)
249222

250223
graphite.stop(opts)
251224
fiber.yield() -- let cancelled fibers disappear from fiber.info()
225+
t.assert_equals(count_workers(), 1)
226+
227+
graphite.stop(opts2)
228+
fiber.yield() -- let cancelled fibers disappear from fiber.info()
252229
t.assert_equals(count_workers(), 0)
253230
end)
254231
end

0 commit comments

Comments
 (0)