diff --git a/README.md b/README.md index 2e61899..c18e3fe 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,11 @@ The `opts` parameter is a Lua table with named options: See the `target` parameter of the [publish](#publish) method. * `max_queue_len`: (optional) max length of internal events buffer queue, default `1024 * 10`. * `max_payload_len`: (optional) max length of serialized event data, default `1024 * 64`, max `1024 * 1024 * 16`. +* `enable_privileged_agent`: (optional) whether to enable privileged agent to receive events. By default + it is enabled dynamically on broker connection for backward compatibility, + but it is strongly suggested to explicitly configure this either with + `true` or `false` as that ensures that events queue for privileged agent + will be pre-created during initialization (or not created at all). The return value will be the event object or `nil`. diff --git a/lualib/resty/events/broker.lua b/lualib/resty/events/broker.lua index 31aaabd..b5c7c4d 100644 --- a/lualib/resty/events/broker.lua +++ b/lualib/resty/events/broker.lua @@ -1,5 +1,4 @@ local cjson = require "cjson.safe" -local nkeys = require "table.nkeys" local codec = require "resty.events.codec" local lrucache = require "resty.lrucache" local queue = require "resty.events.queue" @@ -7,7 +6,6 @@ local server = require("resty.events.protocol").server local is_timeout = server.is_timeout local is_closed = server.is_closed -local pairs = pairs local setmetatable = setmetatable local random = math.random @@ -15,8 +13,10 @@ local ngx = ngx local log = ngx.log local exit = ngx.exit local exiting = ngx.worker.exiting +local worker_count = ngx.worker.count local ERR = ngx.ERR local DEBUG = ngx.DEBUG +local NOTICE = ngx.NOTICE local spawn = ngx.thread.spawn local kill = ngx.thread.kill @@ -37,41 +37,53 @@ end local function broadcast_events(self, unique, data) local n = 0 - -- if unique, schedule to a random worker - local idx = unique and random(1, nkeys(self._clients)) + local queues = self._queues - for _, q in pairs(self._clients) do - - -- skip some and broadcast to one workers - if unique then - idx = idx - 1 - - if idx > 0 then - goto continue - end - end - - local ok, err = q:push(data) + local first_worker_id = self._first_worker_id + local last_worker_id = self._last_worker_id + if unique then + -- if unique, schedule to a random worker + local worker_id = random(first_worker_id, last_worker_id) + local worker_queue = queues[worker_id] + local ok, err = worker_queue:push(data) if not ok then - log(ERR, "failed to publish event: ", err, ". ", - "data is :", cjson_encode(decode(data))) + if worker_id == -1 then + log(ERR, "failed to publish unique event to privileged agent: ", err, + ". data is :", cjson_encode(decode(data))) + else + log(ERR, "failed to publish unique event to worker #", worker_id, + ": ", err, ". data is :", cjson_encode(decode(data))) + end else n = n + 1 + end - if unique then - break + else + for worker_id = first_worker_id, last_worker_id do + local worker_queue = queues[worker_id] + local ok, err = worker_queue:push(data) + if not ok then + if worker_id == -1 then + log(ERR, "failed to publish event to privileged agent: ", err, + ". data is :", cjson_encode(decode(data))) + else + log(ERR, "failed to publish event to worker #", worker_id, + ": ", err, ". data is :", cjson_encode(decode(data))) + end + + else + n = n + 1 end end - - ::continue:: - end -- for q in pairs(_clients) + end log(DEBUG, "event published to ", n, " workers") end local function read_thread(self, worker_connection) + local worker_id = worker_connection.info.id while not terminating(self, worker_connection) do local data, err = worker_connection:recv_frame() if err then @@ -85,7 +97,11 @@ local function read_thread(self, worker_connection) if not data then if not exiting() then - log(ERR, "did not receive event from worker") + if worker_id == -1 then + log(ERR, "did not receive event from privileged agent") + else + log(ERR, "did not receive event from worker #", worker_id) + end end goto continue end @@ -93,7 +109,11 @@ local function read_thread(self, worker_connection) local event_data, err = decode(data) if not event_data then if not exiting() then - log(ERR, "failed to decode event data: ", err) + if worker_id == -1 then + log(ERR, "failed to decode event data on privileged agent: ", err) + else + log(ERR, "failed to decode event data on worker #", worker_id, ": ", err) + end end goto continue end @@ -102,7 +122,11 @@ local function read_thread(self, worker_connection) local unique = event_data.spec.unique if unique then if self._uniques:get(unique) then - log(DEBUG, "unique event is duplicate: ", unique) + if worker_id == -1 then + log(DEBUG, "unique event is duplicate on privileged agent: ", unique) + else + log(DEBUG, "unique event is duplicate on worker #", worker_id, ": ", unique) + end goto continue end @@ -118,9 +142,9 @@ local function read_thread(self, worker_connection) return true end -local function write_thread(self, worker_connection) +local function write_thread(self, worker_connection, worker_queue) while not terminating(self, worker_connection) do - local payload, err = self._clients[worker_connection]:pop() + local payload, err = worker_queue:pop() if not payload then if not is_timeout(err) then return nil, "semaphore wait error: " .. err @@ -146,21 +170,38 @@ local _MT = { __index = _M, } function _M.new(opts) return setmetatable({ _opts = opts, + _queues = nil, _uniques = nil, _clients = nil, + _first_worker_id = nil, + _last_worker_id = nil, }, _MT) end function _M:init() - assert(self._opts) + local opts = self._opts + + assert(opts) local _uniques, err = lrucache.new(MAX_UNIQUE_EVENTS) if not _uniques then return nil, "failed to create the events cache: " .. (err or "unknown") end + local queues = {} + + local first_worker_id = opts.enable_privileged_agent == true and -1 or 0 + local last_worker_id = worker_count() - 1 + + for i = first_worker_id, last_worker_id do + queues[i] = queue.new(opts.max_queue_len) + end + self._uniques = _uniques self._clients = setmetatable({}, WEAK_KEYS_MT) + self._queues = queues + self._first_worker_id = first_worker_id + self._last_worker_id = last_worker_id return true end @@ -172,14 +213,42 @@ function _M:run() exit(444) end - self._clients[worker_connection] = queue.new(self._opts.max_queue_len) + local clients = self._clients + local queues = self._queues + local worker_id = worker_connection.info.id + local worker_pid = worker_connection.info.pid + + if worker_id == -1 and not queues[-1] then + -- TODO: this is for backward compatibility + -- + -- Queue for the privileged agent is dynamically + -- created because it is not always enabled or + -- does not always connect to broker. This also + -- means that privileged agent may miss some + -- events on a startup. + -- + -- It is suggested to instead explicitly pass + -- an option: enable_privileged_agent=true|false. + queues[-1] = queue.new(self._opts.max_queue_len) + self._first_worker_id = -1 + end + + clients[worker_connection] = true local read_thread_co = spawn(read_thread, self, worker_connection) - local write_thread_co = spawn(write_thread, self, worker_connection) + local write_thread_co = spawn(write_thread, self, worker_connection, queues[worker_id]) + + if worker_id == -1 then + log(NOTICE, "privileged agent connected to events broker (worker pid: ", + worker_pid, ")") + else + log(NOTICE, "worker #", worker_id, " connected to events broker (worker pid: ", + worker_pid, ")") + end local ok, err, perr = wait(read_thread_co, write_thread_co) - self._clients[worker_connection] = nil + clients[worker_connection] = nil if exiting() then kill(read_thread_co) @@ -188,12 +257,24 @@ function _M:run() end if not ok and not is_closed(err) then - log(ERR, "event broker failed: ", err) + if worker_id == -1 then + log(ERR, "event broker failed on worker privileged agent: ", err, + " (worker pid: ", worker_pid, ")") + else + log(ERR, "event broker failed on worker #", worker_id, ": ", err, + " (worker pid: ", worker_pid, ")") + end return exit(ngx.ERROR) end if perr and not is_closed(perr) then - log(ERR, "event broker failed: ", perr) + if worker_id == -1 then + log(ERR, "event broker failed on worker privileged agent: ", perr, + " (worker pid: ", worker_pid, ")") + else + log(ERR, "event broker failed on worker #", worker_id, ": ", perr, + " (worker pid: ", worker_pid, ")") + end return exit(ngx.ERROR) end diff --git a/lualib/resty/events/init.lua b/lualib/resty/events/init.lua index 21a9042..66120e9 100644 --- a/lualib/resty/events/init.lua +++ b/lualib/resty/events/init.lua @@ -78,6 +78,10 @@ local function check_options(opts) return nil, '"max_payload_len" option is invalid' end + if opts.enable_privileged_agent ~= nil and type(opts.enable_privileged_agent) ~= "boolean" then + return nil, '"enable_privileged_agent" option must be a boolean' + end + opts.testing = opts.testing or false if type(opts.testing) ~= "boolean" then @@ -110,16 +114,20 @@ function _M:init_worker() local ok, err - -- only enable listening on special worker id if is_broker then + -- enable listening on broker ok, err = self.broker:init() - -- disable listening in other worker elseif worker_id >= 0 then + -- disable listening on other workers ok, err = disable_listening(opts.listening) - -- we do nothing in privileged worker else + -- privileged agent + if opts.enable_privileged_agent == false then + return true + end + ok = true end diff --git a/t/broadcast.t b/t/broadcast.t index d60f79f..99825f7 100644 --- a/t/broadcast.t +++ b/t/broadcast.t @@ -252,7 +252,7 @@ ok --- error_log eval [ qr/event published to 1 workers/, - qr/unique event is duplicate: unique_value/, + qr/unique event is duplicate on worker #\d+: unique_value/, qr/event published to 4 workers/, qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=0, data=01234567890/, qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=1, data=01234567890/, diff --git a/t/events-compat.t b/t/events-compat.t index fa532f4..9c258b1 100644 --- a/t/events-compat.t +++ b/t/events-compat.t @@ -219,7 +219,7 @@ GET /test ok --- error_log event published to 1 workers -unique event is duplicate: unique_value +unique event is duplicate on worker #0: unique_value --- no_error_log [error] [crit] diff --git a/t/events.t b/t/events.t index 3587426..7e4be01 100644 --- a/t/events.t +++ b/t/events.t @@ -239,7 +239,7 @@ GET /test ok --- error_log event published to 1 workers -unique event is duplicate: unique_value +unique event is duplicate on worker #0: unique_value --- no_error_log [error] [crit] @@ -395,6 +395,10 @@ worker-events: handler event; source=content_by_lua, event=request3, wid=\d+, d local ok, err = pcall(ev.new, {listening = "unix:x", max_payload_len = 2^24 + 1}) assert(not ok) ngx.say(trim(err)) + + local ok, err = pcall(ev.new, {listening = "unix:x", enable_privileged_agent = "invalid"}) + assert(not ok) + ngx.say(trim(err)) } } --- request @@ -413,6 +417,7 @@ optional "unique_timeout" option must be a number "max_payload_len" option must be a number "max_payload_len" option is invalid "max_payload_len" option is invalid +"enable_privileged_agent" option must be a boolean --- no_error_log [error] [crit] diff --git a/t/privileged.t b/t/privileged.t index 039c4dd..1848543 100644 --- a/t/privileged.t +++ b/t/privileged.t @@ -263,7 +263,7 @@ ok [ qr/privileged agent process/, qr/event published to 1 workers/, - qr/unique event is duplicate: unique_value/, + qr/unique event is duplicate on worker #\d+: unique_value/, qr/event published to 4 workers/, qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=0, data=01234567890/, qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=1, data=01234567890/, diff --git a/t/stream-broadcast.t b/t/stream-broadcast.t index 16ebfe9..06363de 100644 --- a/t/stream-broadcast.t +++ b/t/stream-broadcast.t @@ -317,7 +317,7 @@ ok --- error_log eval [ qr/event published to 1 workers/, - qr/unique event is duplicate: unique_value/, + qr/unique event is duplicate on worker #\d+: unique_value/, qr/event published to 4 workers/, qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=0, data=01234567890/, qr/worker-events: handler event; source=content_by_lua, event=request1, wid=\d+, by=1, data=01234567890/, diff --git a/t/stream-events.t b/t/stream-events.t index 57b2493..e48ef94 100644 --- a/t/stream-events.t +++ b/t/stream-events.t @@ -294,7 +294,7 @@ GET /test ok --- error_log event published to 1 workers -unique event is duplicate: unique_value +unique event is duplicate on worker #0: unique_value --- no_error_log [error] [crit]