Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions apisix/plugins/loki-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,20 @@ function _M.body_filter(conf, ctx)
end


local function resolve_labels(conf_labels, ctx)
local labels = new_tab(0, 4)
for key, value in pairs(conf_labels) do
local new_val, err, n_resolved = core.utils.resolve_var(value, ctx.var)
if not err and n_resolved > 0 then
labels[key] = new_val
else
labels[key] = value
end
end
return labels
end


function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local max_pending_entries = metadata and metadata.value and
Expand All @@ -218,43 +232,47 @@ function _M.log(conf, ctx)
-- and then add 6 zeros by string concatenation
entry.loki_log_time = tostring(ngx.req.start_time() * 1000) .. "000000"

-- Resolve labels per request into a fresh table and attach to the entry.
-- Mutating `conf.log_labels` in place would leak the first request's
-- resolved values to every later request that shares the same conf
-- (e.g. when this plugin is configured in a global rule).
entry.loki_labels = resolve_labels(conf.log_labels, ctx)

if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end

local labels = conf.log_labels
-- generate a function to be executed by the batch processor.
-- Group entries by their resolved label set so each unique set
-- becomes its own Loki stream within the push.
local func = function(entries)
local streams_by_key = {}
local streams = new_tab(1, 0)

-- parsing possible variables in label value
for key, value in pairs(labels) do
local new_val, err, n_resolved = core.utils.resolve_var(value, ctx.var)
if not err and n_resolved > 0 then
labels[key] = new_val
end
end
for _, e in ipairs(entries) do
local labels = e.loki_labels
e.loki_labels = nil -- clean logger internal field

-- generate a function to be executed by the batch processor
local func = function(entries)
-- build loki request data
local data = {
streams = {
{
local key = core.json.stably_encode(labels)
local stream = streams_by_key[key]
if not stream then
stream = {
stream = labels,
values = new_tab(1, 0),
}
}
}
streams_by_key[key] = stream
table_insert(streams, stream)
end

-- add all entries to the batch
for _, entry in ipairs(entries) do
local log_time = entry.loki_log_time
entry.loki_log_time = nil -- clean logger internal field
local log_time = e.loki_log_time
e.loki_log_time = nil -- clean logger internal field

table_insert(data.streams[1].values, {
log_time, core.json.encode(entry)
table_insert(stream.values, {
log_time, core.json.encode(e)
})
end

return send_http_data(conf, data)
return send_http_data(conf, { streams = streams })
end

batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func, max_pending_entries)
Expand Down
71 changes: 71 additions & 0 deletions t/plugin/loki-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -423,3 +423,74 @@ GET /hello
hello world
--- error_log
go(): authorization: test1234



=== TEST 17: setup route ($variable in log_labels must resolve per request)
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"loki-logger": {
"endpoint_addrs": ["http://127.0.0.1:3100"],
"tenant_id": "tenant_1",
"log_labels": {
"custom_label": "$arg_X"
},
"batch_max_size": 1
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]]
)

if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- response_body
passed



=== TEST 18: hit route twice with distinct $arg_X values
--- request eval
["GET /hello?X=alpha", "GET /hello?X=beta"]
--- response_body eval
["hello world\n", "hello world\n"]



=== TEST 19: both label values must appear in Loki
--- config
location /t {
content_by_lua_block {
local cjson = require("cjson")
local now = ngx.now() * 1000
local data, err = require("lib.grafana_loki").fetch_logs_from_loki(
tostring(now - 3000) .. "000000", -- from
tostring(now) .. "000000", -- to
{ query = [[{custom_label="beta"} | json]] }
)

assert(err == nil, "fetch logs error: " .. (err or ""))
assert(data.status == "success", "loki response error: " .. cjson.encode(data))
-- Without the fix, the first request mutates conf.log_labels in
-- place ("alpha"), so every subsequent entry flushes with that
-- frozen value and no stream with custom_label="beta" exists.
assert(#data.data.result > 0, "no entries with custom_label=beta -- $arg_X did not resolve per request")
}
}
--- error_code: 200
Loading