Skip to content

Commit

Permalink
fix(broker): worker id based queues
Browse files Browse the repository at this point in the history
### Summary

Switches from worker connection based broker queues to worker id based broker queues
that makes it possible to retain events even when worker has not yet connected
(on node startup), or worker is restarting/reloading.
  • Loading branch information
bungle committed Jun 19, 2024
1 parent b8606c5 commit e958d24
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 43 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ The `opts` parameter is a Lua table with named options:
See the `target` parameter of the [publish](#publish) method.
* `max_queue_len`: (optional) max length of internal events buffer queue, default `1024 * 10`.
* `max_payload_len`: (optional) max length of serialized event data, default `1024 * 64`, max `1024 * 1024 * 16`.
* `enable_privileged_agent`: (optional) whether to enable privileged agent to receive events. By default
it is enabled dynamically on broker connection for backward compatibility,
but it is strongly suggested to explicitly configure this either with
`true` or `false` as that ensures that events queue for privileged agent
will be pre-created during initialization (or not created at all).

The return value will be the event object or `nil`.

Expand Down
149 changes: 115 additions & 34 deletions lualib/resty/events/broker.lua
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
local cjson = require "cjson.safe"
local nkeys = require "table.nkeys"
local codec = require "resty.events.codec"
local lrucache = require "resty.lrucache"
local queue = require "resty.events.queue"
local server = require("resty.events.protocol").server
local is_timeout = server.is_timeout
local is_closed = server.is_closed

local pairs = pairs
local setmetatable = setmetatable
local random = math.random

local ngx = ngx
local log = ngx.log
local exit = ngx.exit
local exiting = ngx.worker.exiting
local worker_count = ngx.worker.count
local ERR = ngx.ERR
local DEBUG = ngx.DEBUG
local NOTICE = ngx.NOTICE

local spawn = ngx.thread.spawn
local kill = ngx.thread.kill
Expand All @@ -37,41 +37,53 @@ end
local function broadcast_events(self, unique, data)
local n = 0

-- if unique, schedule to a random worker
local idx = unique and random(1, nkeys(self._clients))
local queues = self._queues

for _, q in pairs(self._clients) do

-- skip some and broadcast to one workers
if unique then
idx = idx - 1

if idx > 0 then
goto continue
end
end

local ok, err = q:push(data)
local first_worker_id = self._first_worker_id
local last_worker_id = self._last_worker_id

if unique then
-- if unique, schedule to a random worker
local worker_id = random(first_worker_id, last_worker_id)
local worker_queue = queues[worker_id]
local ok, err = worker_queue:push(data)
if not ok then
log(ERR, "failed to publish event: ", err, ". ",
"data is :", cjson_encode(decode(data)))
if worker_id == -1 then
log(ERR, "failed to publish unique event to privileged agent: ", err,
". data is :", cjson_encode(decode(data)))
else
log(ERR, "failed to publish unique event to worker #", worker_id,
": ", err, ". data is :", cjson_encode(decode(data)))
end

else
n = n + 1
end

if unique then
break
else
for worker_id = first_worker_id, last_worker_id do
local worker_queue = queues[worker_id]
local ok, err = worker_queue:push(data)
if not ok then
if worker_id == -1 then
log(ERR, "failed to publish event to privileged agent: ", err,
". data is :", cjson_encode(decode(data)))
else
log(ERR, "failed to publish event to worker #", worker_id,
": ", err, ". data is :", cjson_encode(decode(data)))
end

else
n = n + 1
end
end

::continue::
end -- for q in pairs(_clients)
end

log(DEBUG, "event published to ", n, " workers")
end

local function read_thread(self, worker_connection)
local worker_id = worker_connection.info.id
while not terminating(self, worker_connection) do
local data, err = worker_connection:recv_frame()
if err then
Expand All @@ -85,15 +97,23 @@ local function read_thread(self, worker_connection)

if not data then
if not exiting() then
log(ERR, "did not receive event from worker")
if worker_id == -1 then
log(ERR, "did not receive event from privileged agent")
else
log(ERR, "did not receive event from worker #", worker_id)
end
end
goto continue
end

local event_data, err = decode(data)
if not event_data then
if not exiting() then
log(ERR, "failed to decode event data: ", err)
if worker_id == -1 then
log(ERR, "failed to decode event data on privileged agent: ", err)
else
log(ERR, "failed to decode event data on worker #", worker_id, ": ", err)
end
end
goto continue
end
Expand All @@ -102,7 +122,11 @@ local function read_thread(self, worker_connection)
local unique = event_data.spec.unique
if unique then
if self._uniques:get(unique) then
log(DEBUG, "unique event is duplicate: ", unique)
if worker_id == -1 then
log(DEBUG, "unique event is duplicate on privileged agent: ", unique)
else
log(DEBUG, "unique event is duplicate on worker #", worker_id, ": ", unique)
end
goto continue
end

Expand All @@ -118,9 +142,9 @@ local function read_thread(self, worker_connection)
return true
end

local function write_thread(self, worker_connection)
local function write_thread(self, worker_connection, worker_queue)
while not terminating(self, worker_connection) do
local payload, err = self._clients[worker_connection]:pop()
local payload, err = worker_queue:pop()
if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
Expand All @@ -146,21 +170,38 @@ local _MT = { __index = _M, }
function _M.new(opts)
return setmetatable({
_opts = opts,
_queues = nil,
_uniques = nil,
_clients = nil,
_first_worker_id = nil,
_last_worker_id = nil,
}, _MT)
end

function _M:init()
assert(self._opts)
local opts = self._opts

assert(opts)

local _uniques, err = lrucache.new(MAX_UNIQUE_EVENTS)
if not _uniques then
return nil, "failed to create the events cache: " .. (err or "unknown")
end

local queues = {}

local first_worker_id = opts.enable_privileged_agent == true and -1 or 0
local last_worker_id = worker_count() - 1

for i = first_worker_id, last_worker_id do
queues[i] = queue.new(opts.max_queue_len)
end

self._uniques = _uniques
self._clients = setmetatable({}, WEAK_KEYS_MT)
self._queues = queues
self._first_worker_id = first_worker_id
self._last_worker_id = last_worker_id

return true
end
Expand All @@ -172,14 +213,42 @@ function _M:run()
exit(444)
end

self._clients[worker_connection] = queue.new(self._opts.max_queue_len)
local clients = self._clients
local queues = self._queues
local worker_id = worker_connection.info.id
local worker_pid = worker_connection.info.pid

if worker_id == -1 and not queues[-1] then
-- TODO: this is for backward compatibility
--
-- Queue for the privileged agent is dynamically
-- created because it is not always enabled or
-- does not always connect to broker. This also
-- means that privileged agent may miss some
-- events on a startup.
--
-- It is suggested to instead explicitly pass
-- an option: enable_privileged_agent=true|false.
queues[-1] = queue.new(self._opts.max_queue_len)
self._first_worker_id = -1
end

clients[worker_connection] = true

local read_thread_co = spawn(read_thread, self, worker_connection)
local write_thread_co = spawn(write_thread, self, worker_connection)
local write_thread_co = spawn(write_thread, self, worker_connection, queues[worker_id])

if worker_id == -1 then
log(NOTICE, "privileged agent connected to events broker (worker pid: ",
worker_pid, ")")
else
log(NOTICE, "worker #", worker_id, " connected to events broker (worker pid: ",
worker_pid, ")")
end

local ok, err, perr = wait(read_thread_co, write_thread_co)

self._clients[worker_connection] = nil
clients[worker_connection] = nil

if exiting() then
kill(read_thread_co)
Expand All @@ -188,12 +257,24 @@ function _M:run()
end

if not ok and not is_closed(err) then
log(ERR, "event broker failed: ", err)
if worker_id == -1 then
log(ERR, "event broker failed on worker privileged agent: ", err,
" (worker pid: ", worker_pid, ")")
else
log(ERR, "event broker failed on worker #", worker_id, ": ", err,
" (worker pid: ", worker_pid, ")")
end
return exit(ngx.ERROR)
end

if perr and not is_closed(perr) then
log(ERR, "event broker failed: ", perr)
if worker_id == -1 then
log(ERR, "event broker failed on worker privileged agent: ", perr,
" (worker pid: ", worker_pid, ")")
else
log(ERR, "event broker failed on worker #", worker_id, ": ", perr,
" (worker pid: ", worker_pid, ")")
end
return exit(ngx.ERROR)
end

Expand Down
14 changes: 11 additions & 3 deletions lualib/resty/events/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ local function check_options(opts)
return nil, '"max_payload_len" option is invalid'
end

if opts.enable_privileged_agent ~= nil and type(opts.enable_privileged_agent) ~= "boolean" then
return nil, '"enable_privileged_agent" option must be a boolean'
end

opts.testing = opts.testing or false

if type(opts.testing) ~= "boolean" then
Expand Down Expand Up @@ -110,16 +114,20 @@ function _M:init_worker()

local ok, err

-- only enable listening on special worker id
if is_broker then
-- enable listening on broker
ok, err = self.broker:init()

-- disable listening in other worker
elseif worker_id >= 0 then
-- disable listening on other workers
ok, err = disable_listening(opts.listening)

-- we do nothing in privileged worker
else
-- privileged agent
if opts.enable_privileged_agent == false then
return true
end

ok = true
end

Expand Down
2 changes: 1 addition & 1 deletion t/broadcast.t
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ ok
--- error_log eval
[
qr/event published to 1 workers/,
qr/unique event is duplicate: unique_value/,
qr/unique event is duplicate on worker #\d+: unique_value/,
qr/event published to 4 workers/,
qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=0, data=01234567890/,
qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=1, data=01234567890/,
Expand Down
2 changes: 1 addition & 1 deletion t/events-compat.t
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ GET /test
ok
--- error_log
event published to 1 workers
unique event is duplicate: unique_value
unique event is duplicate on worker #0: unique_value
--- no_error_log
[error]
[crit]
Expand Down
7 changes: 6 additions & 1 deletion t/events.t
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ GET /test
ok
--- error_log
event published to 1 workers
unique event is duplicate: unique_value
unique event is duplicate on worker #0: unique_value
--- no_error_log
[error]
[crit]
Expand Down Expand Up @@ -395,6 +395,10 @@ worker-events: handler event; source=content_by_lua, event=request3, wid=\d+, d
local ok, err = pcall(ev.new, {listening = "unix:x", max_payload_len = 2^24 + 1})
assert(not ok)
ngx.say(trim(err))
local ok, err = pcall(ev.new, {listening = "unix:x", enable_privileged_agent = "invalid"})
assert(not ok)
ngx.say(trim(err))
}
}
--- request
Expand All @@ -413,6 +417,7 @@ optional "unique_timeout" option must be a number
"max_payload_len" option must be a number
"max_payload_len" option is invalid
"max_payload_len" option is invalid
"enable_privileged_agent" option must be a boolean
--- no_error_log
[error]
[crit]
Expand Down
2 changes: 1 addition & 1 deletion t/privileged.t
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ ok
[
qr/privileged agent process/,
qr/event published to 1 workers/,
qr/unique event is duplicate: unique_value/,
qr/unique event is duplicate on worker #\d+: unique_value/,
qr/event published to 4 workers/,
qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=0, data=01234567890/,
qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=1, data=01234567890/,
Expand Down
2 changes: 1 addition & 1 deletion t/stream-broadcast.t
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ ok
--- error_log eval
[
qr/event published to 1 workers/,
qr/unique event is duplicate: unique_value/,
qr/unique event is duplicate on worker #\d+: unique_value/,
qr/event published to 4 workers/,
qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=0, data=01234567890/,
qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=1, data=01234567890/,
Expand Down
2 changes: 1 addition & 1 deletion t/stream-events.t
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ GET /test
ok
--- error_log
event published to 1 workers
unique event is duplicate: unique_value
unique event is duplicate on worker #0: unique_value
--- no_error_log
[error]
[crit]
Expand Down

0 comments on commit e958d24

Please sign in to comment.