diff --git a/README.md b/README.md index e9a6f06..64519cd 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,11 @@ Available user configurations are listed as follows: * `path` - If the log server uses a stream-typed unix domain socket, `path` is the socket file path. Note that host/port and path cannot both be empty. At least one must be supplied. + If the log server uses a unix domain socket `path` is the socket file path. Note that host/port and path cannot both be empty. At least one must be supplied. + +* `datagram` + + Sets whether datagrams should be used. For host/port connections this means UDP rather than the default, TCP. For unix domain sockets this uses SOCK_DGRAM rather than the default, SOCK_STREAM. Please note that if you are using datagram then we will send one datagram packet per message. * `max_retry_times` diff --git a/lib/resty/logger/socket.lua b/lib/resty/logger/socket.lua index 7a7b148..8cc34fb 100644 --- a/lib/resty/logger/socket.lua +++ b/lib/resty/logger/socket.lua @@ -3,11 +3,13 @@ local concat = table.concat local tcp = ngx.socket.tcp +local udp = ngx.socket.udp local timer_at = ngx.timer.at local ngx_log = ngx.log local ngx_sleep = ngx.sleep local type = type local pairs = pairs +local ipairs = ipairs local tostring = tostring local debug = ngx.config.debug @@ -55,14 +57,17 @@ local timeout = 1000 -- 1 sec local host local port local path +local datagram = false -- internal variables local buffer_size = 0 --- 2nd level buffer, it stores logs ready to be sent out -local send_buffer = "" -- 1st level buffer, it stores incoming logs -local log_buffer_data = new_tab(20000, 0) -local log_buffer_index = 0 +local incoming_buffer = new_tab(20000, 0) +local incoming_buffer_index = 0 +-- 2nd level buffer, it stores logs ready to be sent out +local send_buffer = new_tab(1000) +local send_buffer_index = 0 +local send_buffer_size = 0 local last_error @@ -87,7 +92,12 @@ local function _do_connect() local ok, err if not connected then - sock, err = tcp() + if datagram then + sock, err = udp() + else + sock, err = tcp() + end + if not sock then _write_error(err) return nil, err @@ -98,9 +108,17 @@ local function _do_connect() -- host/port and path config have already been checked in init() if host and port then - ok, err = sock:connect(host, port) + if datagram then + ok, err = sock:setpeername(host, port) + else + ok, err = sock:connect(host, port) + end elseif path then - ok, err = sock:connect("unix:" .. path) + if datagram then + ok, err = sock:setpeername("unix:" .. path) + else + ok, err = sock:connect("unix:" .. path) + end end return ok, err @@ -150,30 +168,50 @@ local function _connect() return true end -local function _prepare_stream_buffer() - local packet = concat(log_buffer_data) - send_buffer = send_buffer .. packet +local function _prepare_send_buffer() + for i=1, incoming_buffer_index do + send_buffer_index = send_buffer_index + 1 + send_buffer[send_buffer_index] = incoming_buffer[i] + end + + send_buffer_size = buffer_size + incoming_buffer_index = 0 + clear_tab(incoming_buffer) +end - clear_tab(log_buffer_data) - log_buffer_index = 0 +local function _reset_send_buffer() + buffer_size = buffer_size - send_buffer_size + send_buffer_index = 0 + send_buffer_size = 0 + clear_tab(send_buffer) end -local function _do_flush() - local packet = send_buffer +-- this is expensive and should only be used to tidy up in case of an error +local function _pop_send_buffer(count) + for i=1, count do + local packet = send_buffer.remove(i) + send_buffer_index = send_buffer_index - 1 + send_buffer_size = send_buffer_size - #packet + end +end + +local function _do_stream_flush() local ok, err = _connect() if not ok then return nil, err end - local bytes, err = sock:send(packet) + local bytes, err = sock:send(send_buffer) if not bytes then -- sock:send always close current connection on error return nil, err end + _reset_send_buffer() + if debug then ngx.update_time() - ngx_log(DEBUG, ngx.now(), ":log flush:" .. bytes .. ":" .. packet) + ngx_log(DEBUG, ngx.now(), ":log flush:" .. bytes) end ok, err = sock:setkeepalive(0, pool_size) @@ -184,8 +222,33 @@ local function _do_flush() return true end +local function _do_datagram_flush() + local ok, err = _connect() + if not ok then + return nil, err + end + + for i, packet in ipairs(send_buffer) do + local bytes, err = sock:send(packet) + if not bytes then + -- ensure we don't resend packets later that we've already sent + _pop_send_buffer(i - 1) + return nil, err + end + + if debug then + ngx.update_time() + ngx_log(DEBUG, ngx.now(), ":log flush:" .. bytes) + end + end + + _reset_send_buffer() + + return true +end + local function _need_flush() - if log_buffer_index > 0 or #send_buffer > 0 then + if incoming_buffer_index > 0 or send_buffer_index > 0 then return true end @@ -218,7 +281,7 @@ local function _flush() if not _need_flush() then if debug then - ngx_log(DEBUG, "do not need to flush:", log_buffer_index) + ngx_log(DEBUG, "do not need to flush") end _flush_unlock() return true @@ -231,11 +294,15 @@ local function _flush() end while retry_send <= max_retry_times do - if log_buffer_index > 0 then - _prepare_stream_buffer() + if incoming_buffer_index > 0 then + _prepare_send_buffer() end - ok, err = _do_flush() + if datagram then + ok, err = _do_datagram_flush() + else + ok, err = _do_stream_flush() + end if ok then break @@ -263,9 +330,6 @@ local function _flush() return nil, err_msg end - buffer_size = buffer_size - #send_buffer - send_buffer = "" - return true end @@ -280,12 +344,11 @@ local function _flush_buffer() end local function _write_buffer(msg) - log_buffer_index = log_buffer_index + 1 - log_buffer_data[log_buffer_index] = msg + incoming_buffer_index = incoming_buffer_index + 1 + incoming_buffer[incoming_buffer_index] = msg buffer_size = buffer_size + #msg - return buffer_size end @@ -301,6 +364,8 @@ function _M.init(user_config) port = v elseif k == "path" then path = v + elseif k == "datagram" then + datagram = v elseif k == "flush_limit" then flush_limit = v elseif k == "drop_limit" then @@ -321,7 +386,6 @@ function _M.init(user_config) return nil, "no logging server configured. Need host/port or path." end - if (flush_limit >= drop_limit) then return nil, "flush_limit should < drop_limit" end