Skip to content

Commit 7109b97

Browse files
graphite: multiple servers support
This patch adds the ability to send metrics to the multiple servers. Part of #TNTP-6584
1 parent 9c744c2 commit 7109b97

3 files changed

Lines changed: 137 additions & 41 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
88

99
### Added
1010

11+
- `graphite`: ability to send metrics to the multiple servers.
12+
1113
### Changed
1214

1315
### Fixed

metrics/plugins/graphite.lua

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ local fiber = require('fiber')
33
local metrics = require('metrics')
44
local checks = require('checks')
55
local log = require('log')
6-
local fun = require('fun')
76

87
local graphite = {}
98

@@ -16,6 +15,29 @@ local DEFAULT_SEND_INTERVAL = 2
1615
-- Constants
1716
local LABELS_SEP = ';'
1817

18+
local GRAPHITE_FIBERS = {}
19+
20+
local function create_fiber_table(opts)
21+
opts = opts or {}
22+
local needLongName = (opts ~= nil)
23+
local graphite_fiber = {}
24+
25+
graphite_fiber.sock = nil
26+
27+
graphite_fiber.prefix = opts.prefix or DEFAULT_PREFIX
28+
graphite_fiber.host = opts.host or DEFAULT_HOST
29+
graphite_fiber.port = opts.port or DEFAULT_PORT
30+
graphite_fiber.send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
31+
32+
graphite_fiber.name = "metrics_graphite_worker"
33+
if needLongName then
34+
graphite_fiber.name = graphite_fiber.name .. '_' ..
35+
graphite_fiber.prefix .. '_' .. graphite_fiber.host .. '_' .. graphite_fiber.port
36+
end
37+
38+
return graphite_fiber
39+
end
40+
1941
function graphite.format_observation(prefix, obs)
2042
local metric_path = #prefix > 0 and ('%s.%s'):format(prefix, obs.metric_name) or obs.metric_name
2143

@@ -36,23 +58,23 @@ function graphite.format_observation(prefix, obs)
3658
return graph
3759
end
3860

39-
local function graphite_worker(opts)
40-
fiber.name('metrics_graphite_worker')
61+
local function graphite_worker(args)
62+
fiber.name(args.name)
4163

4264
while true do
4365
metrics.invoke_callbacks()
4466
for _, c in pairs(metrics.collectors()) do
4567
for _, obs in ipairs(c:collect()) do
46-
local data = graphite.format_observation(opts.prefix, obs)
47-
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
68+
local data = graphite.format_observation(args.prefix, obs)
69+
local numbytes = args.sock:sendto(args.host, args.port, data)
4870
if numbytes == nil then
4971
log.error('Error while sending to host %s port %s data %s',
50-
opts.host, opts.port, data)
72+
args.host, args.port, data)
5173
end
5274
end
5375
end
5476

55-
fiber.sleep(opts.send_interval)
77+
fiber.sleep(args.send_interval)
5678
end
5779
end
5880

@@ -64,25 +86,38 @@ function graphite.init(opts)
6486
send_interval = '?number'
6587
}
6688

67-
local sock = socket('AF_INET', 'SOCK_DGRAM', 'udp')
68-
assert(sock ~= nil, 'Socket creation failed')
89+
local graphite_fiber = create_fiber_table(opts)
6990

70-
local prefix = opts.prefix or DEFAULT_PREFIX
71-
local host = opts.host or DEFAULT_HOST
72-
local port = opts.port or DEFAULT_PORT
73-
local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
91+
-- require('config'):reload() triggers only validate() and apply()
92+
-- role's methods without stop().
93+
-- so, we should kill previous fiber if exist.
94+
if GRAPHITE_FIBERS[graphite_fiber.name] then
95+
fiber.kill(GRAPHITE_FIBERS[graphite_fiber.name].fiber)
96+
GRAPHITE_FIBERS[graphite_fiber.name] = nil
97+
require('fiber').yield()
98+
end
7499

75-
fun.iter(fiber.info()):
76-
filter(function(_, x) return x.name == 'metrics_graphite_worker' end):
77-
each(function(x) fiber.kill(x) end)
100+
graphite_fiber.sock = socket('AF_INET', 'SOCK_DGRAM', 'udp')
101+
assert(graphite_fiber.sock ~= nil, 'Socket creation failed')
78102

79-
fiber.create(graphite_worker, {
80-
prefix = prefix,
81-
sock = sock,
82-
host = host,
83-
port = port,
84-
send_interval = send_interval,
103+
graphite_fiber.fiber = fiber.create(graphite_worker, {
104+
name = graphite_fiber.name,
105+
prefix = graphite_fiber.prefix,
106+
sock = graphite_fiber.sock,
107+
host = graphite_fiber.host,
108+
port = graphite_fiber.port,
109+
send_interval = graphite_fiber.send_interval,
85110
})
111+
112+
GRAPHITE_FIBERS[graphite_fiber.name] = graphite_fiber
113+
end
114+
115+
function graphite.stop()
116+
for _, v in pairs(GRAPHITE_FIBERS) do
117+
v.sock:close()
118+
fiber.kill(v.fiber)
119+
end
120+
GRAPHITE_FIBERS = {}
86121
end
87122

88123
return graphite

test/plugins/graphite_test.lua

Lines changed: 78 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,8 @@ end)
3636

3737
g.after_each(function(cg)
3838
cg.server:exec(function()
39-
local fiber = require('fiber')
40-
local fun = require('fun')
41-
local metrics = require('metrics')
4239
-- Delete all collectors and global labels
43-
metrics.clear()
44-
fun.iter(fiber.info()):
45-
filter(function(_, x) return x.name == 'metrics_graphite_worker' end):
46-
each(function(x) fiber.kill(x) end)
47-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
40+
require('metrics').clear()
4841
end)
4942
end)
5043

@@ -152,43 +145,109 @@ g.test_graphite_sends_data_to_socket = function(cg)
152145

153146
local cnt = metrics.counter('test_cnt', 'test-cnt')
154147
cnt:inc(1)
148+
155149
graphite.init({port = port})
156150
end, {port})
157151

158-
require('fiber').sleep(0.5)
152+
sock:readable(2)
153+
159154
local graphite_obs = sock:recvfrom(50)
160155
local obs_table = graphite_obs:split(' ')
161156
t.assert_equals(obs_table[1], 'tarantool.test_cnt')
162157
t.assert_equals(obs_table[2], '1')
163158
sock:close()
159+
160+
cg.server:exec(function()
161+
require('metrics.plugins.graphite').stop()
162+
end)
164163
end
165164

166-
g.test_graphite_kills_previous_fibers_on_init = function(cg)
165+
g.test_graphite_stop_default_fibers = function(cg)
167166
cg.server:exec(function()
168167
local fiber = require('fiber')
169168
local fun = require('fun')
170169
local graphite = require('metrics.plugins.graphite')
171170

172-
local function mock_graphite_worker()
173-
fiber.create(function()
174-
fiber.name('metrics_graphite_worker')
175-
fiber.sleep(math.huge)
176-
end)
171+
local function count_workers()
172+
return fun.iter(fiber.info()):
173+
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
174+
length()
177175
end
178176

177+
t.assert_equals(count_workers(), 0)
178+
179+
graphite.init({})
180+
t.assert_equals(count_workers(), 1)
181+
182+
graphite.stop()
183+
require('fiber'):yield()
184+
t.assert_equals(count_workers(), 0)
185+
end)
186+
end
187+
188+
g.test_graphite_stop_custom_fiber = function(cg)
189+
cg.server:exec(function()
190+
local fiber = require('fiber')
191+
local fun = require('fun')
192+
local graphite = require('metrics.plugins.graphite')
193+
179194
local function count_workers()
180195
return fun.iter(fiber.info()):
181-
filter(function(_, x) return x.name == 'metrics_graphite_worker' end):
196+
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
182197
length()
183198
end
184199

200+
local opts = {
201+
prefix = "master",
202+
host = "127.0.0.1",
203+
port = 3333,
204+
send_interval = 1,
205+
}
206+
207+
local opts2 = {
208+
prefix = "tarantool",
209+
host = "127.0.0.1",
210+
port = 4444,
211+
send_interval = 1,
212+
}
213+
185214
t.assert_equals(count_workers(), 0)
186-
mock_graphite_worker()
187-
mock_graphite_worker()
215+
216+
graphite.init(opts)
217+
graphite.init(opts2)
218+
188219
t.assert_equals(count_workers(), 2)
189220

221+
graphite.stop()
222+
require('fiber'):yield()
223+
t.assert_equals(count_workers(), 0)
224+
end)
225+
end
226+
227+
g.test_graphite_double_start = function(cg)
228+
cg.server:exec(function()
229+
local fiber = require('fiber')
230+
local fun = require('fun')
231+
local graphite = require('metrics.plugins.graphite')
232+
233+
local function count_workers()
234+
return fun.iter(fiber.info()):
235+
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
236+
length()
237+
end
238+
239+
t.assert_equals(count_workers(), 0)
240+
190241
graphite.init({})
191-
fiber.yield() -- let cancelled fibers disappear from fiber.info()
242+
192243
t.assert_equals(count_workers(), 1)
244+
245+
graphite.init({})
246+
247+
t.assert_equals(count_workers(), 1)
248+
249+
graphite.stop()
250+
require('fiber'):yield()
251+
t.assert_equals(count_workers(), 0)
193252
end)
194253
end

0 commit comments

Comments
 (0)