Skip to content

Commit

Permalink
fix(*): retain events on send failures
Browse files Browse the repository at this point in the history
### Summary

If there is an error with socket connection the popped event data
is retained and pushed in front of the queue.
  • Loading branch information
bungle committed Jun 19, 2024
1 parent a7f3d93 commit 91297f2
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 22 deletions.
11 changes: 11 additions & 0 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ local function read_thread(self, worker_connection)
end

local function write_thread(self, worker_connection, worker_queue)
local worker_id = worker_connection.info.id
while not terminating(self, worker_connection) do
local payload, err = worker_queue:pop()
if not payload then
Expand All @@ -156,6 +157,16 @@ local function write_thread(self, worker_connection, worker_queue)

local _, err = worker_connection:send_frame(payload)
if err then
local ok, push_err = worker_queue:push_front(payload)
if not ok then
if worker_id == -1 then
log(ERR, "failed to retain event for privileged agent: ",
push_err, ". data is :", cjson_encode(decode(payload)))
else
log(ERR, "failed to retain event for worker #", worker_id, ": ",
push_err, ". data is :", cjson_encode(decode(payload)))
end
end
return nil, "failed to send event: " .. err
end

Expand Down
16 changes: 16 additions & 0 deletions lualib/resty/events/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ function _M:push(item)
end


function _M:push_front(item)
local first = self.first
if first > self.last then
return self:push(item)
end

first = first - 1
self.first = first
self.elts[first] = item

self.semaphore:post()

return true
end


function _M:pop()
local ok, err = self.semaphore:wait(1)
if not ok then
Expand Down
48 changes: 27 additions & 21 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ local ngx = ngx
local log = ngx.log
local sleep = ngx.sleep
local exiting = ngx.worker.exiting
local ngx_worker_id = ngx.worker.id
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local NOTICE = ngx.NOTICE

local spawn = ngx.thread.spawn
local kill = ngx.thread.kill
Expand Down Expand Up @@ -50,9 +52,6 @@ local PAYLOAD_T = {
data = '',
}

--local _worker_pid = ngx.worker.pid()
local _worker_id = ngx.worker.id() or -1

local _M = {}
local _MT = { __index = _M, }

Expand Down Expand Up @@ -118,18 +117,20 @@ function _M.new(opts)
_sub_queue = queue.new(max_queue_len),
_callback = callback.new(),
_connected = nil,
_worker_id = nil,
_opts = opts,
}

return setmetatable(self, _MT)
end

local function read_thread(self, broker_connection)
local sub_queue = self._sub_queue
while not terminating(self) do
local data, err = broker_connection:recv_frame()
if err then
if not is_timeout(err) then
return nil, "failed to read event: " .. err
return nil, err
end

-- timeout
Expand All @@ -152,7 +153,7 @@ local function read_thread(self, broker_connection)
end

-- got an event data, push to queue, callback in events_thread
local _, err = self._sub_queue:push(event_data)
local _, err = sub_queue:push(event_data)
if err then
if not exiting() then
log(ERR, "failed to store event: ", err, ". data is: ",
Expand All @@ -169,9 +170,9 @@ end

local function write_thread(self, broker_connection)
local counter = 0

local pub_queue = self._pub_queue
while not terminating(self) do
local payload, err = self._pub_queue:pop()
local payload, err = pub_queue:pop()
if err then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
Expand All @@ -183,6 +184,10 @@ local function write_thread(self, broker_connection)

local _, err = broker_connection:send_frame(payload)
if err then
local ok, push_err = pub_queue:push_front(payload)
if not ok then
log(ERR, "failed to retain event: ", push_err)
end
return nil, "failed to send event: " .. err
end

Expand Down Expand Up @@ -233,7 +238,7 @@ function _M:communicate()
local listening = self._opts.listening

if not check_sock_exist(listening) then
log(DEBUG, "unix domain sock(", listening, ") is not ready")
log(DEBUG, "unix domain socket (", listening, ") is not ready")

-- try to reconnect broker, avoid crit error log
start_communicate_timer(self, 0.002)
Expand All @@ -260,13 +265,17 @@ function _M:communicate()
return
end

log(DEBUG, _worker_id, " on (", listening, ") is ready")

self._connected = true

local read_thread_co = spawn(read_thread, self, broker_connection)
local write_thread_co = spawn(write_thread, self, broker_connection)

if self._worker_id == -1 then
log(NOTICE, "privileged agent is ready to accept events from ", listening)
else
log(NOTICE, "worker #", self._worker_id, " is ready to accept events from ", listening)
end

local ok, err, perr = wait(read_thread_co, write_thread_co)

self._connected = nil
Expand All @@ -281,11 +290,11 @@ function _M:communicate()
end

if not ok then
log(ERR, "event worker failed: ", err)
log(ERR, "event worker failed to communicate with broker (", err, ")")
end

if perr then
log(ERR, "event worker failed: ", perr)
log(ERR, "event worker failed to communicate with broker (", perr, ")")
end

wait(read_thread_co)
Expand All @@ -304,11 +313,11 @@ function _M:process_events()
end

if not ok then
log(ERR, "event worker failed: ", err)
log(ERR, "event worker failed to process events (", err, ")")
end

if perr then
log(ERR, "event worker failed: ", perr)
log(ERR, "event worker failed to process events (", perr, ")")
end

start_process_events_timer(self)
Expand All @@ -317,6 +326,8 @@ end
function _M:init()
assert(self._opts)

self._worker_id = ngx_worker_id() or -1

start_timers(self)

return true
Expand All @@ -329,7 +340,7 @@ local function post_event(self, source, event, data, spec)
EVENT_T.source = source
EVENT_T.event = event
EVENT_T.data = data
EVENT_T.wid = _worker_id
EVENT_T.wid = self._worker_id or ngx_worker_id() or -1

-- encode event info
str, err = encode(EVENT_T)
Expand Down Expand Up @@ -366,12 +377,6 @@ local function post_event(self, source, event, data, spec)
end

function _M:publish(target, source, event, data)
local ok, err

-- if not self._connected then
-- return nil, "not initialized yet"
-- end

assert(type(target) == "string" and target ~= "", "target is required")
assert(type(source) == "string" and source ~= "", "source is required")
assert(type(event) == "string" and event ~= "", "event is required")
Expand All @@ -389,6 +394,7 @@ function _M:publish(target, source, event, data)
return true
end

local ok, err
if target == "current" then
ok, err = self._sub_queue:push({
source = source,
Expand Down
2 changes: 1 addition & 1 deletion t/events.t
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ worker-events: handler event; source=content_by_lua, event=request6, wid=\d+, d
ev:subscribe("*", "*", function(data, event, source, wid)
ngx.log(ngx.DEBUG, "worker-events: handler event; ", "source=",source,", event=",event, ", wid=", wid,
", data=", tostring(data))
end)
end)

ev:publish("all", "content_by_lua", "request3", "01234567890")

Expand Down
63 changes: 63 additions & 0 deletions t/queue.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:
use Test::Nginx::Socket::Lua;

plan tests => repeat_each() * (blocks() * 5);

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

check_accum_error_log();
master_on();
workers(1);
run_tests();

__DATA__

=== TEST 1: queue works correctly
--- http_config
lua_package_path "../lua-resty-core/lib/?.lua;lualib/?/init.lua;lualib/?.lua;;";
--- config
location = /test {
access_by_lua_block {
local queue = require("resty.events.queue").new(10240)
local value, err = queue:pop()
ngx.say(err)
assert(queue:push_front("first"))
ngx.say(queue:pop())
assert(queue:push("second"))
assert(queue:push_front("first"))
ngx.say(queue:pop())
ngx.say(queue:pop())
value, err = queue:pop()
ngx.say(err)
assert(queue:push("first"))
assert(queue:push("second"))
ngx.say(queue:pop())
ngx.say(queue:pop())
assert(queue:push_front("third"))
assert(queue:push_front("second"))
assert(queue:push_front("first"))
ngx.say(queue:pop())
ngx.say(queue:pop())
ngx.say(queue:pop())
value, err = queue:pop()
ngx.say(err)
}
}
--- request
GET /test
--- response_body
timeout
first
first
second
timeout
first
second
first
second
third
timeout
--- no_error_log
[crit]
[error]
[warn]

0 comments on commit 91297f2

Please sign in to comment.