Skip to content

Commit

Permalink
fix(worker): do not premature kill threads when not exiting
Browse files Browse the repository at this point in the history
  • Loading branch information
bungle committed May 27, 2024
1 parent ab580f7 commit d3bfdd0
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 124 deletions.
255 changes: 131 additions & 124 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local que = require "resty.events.queue"
local queue = require "resty.events.queue"
local callback = require "resty.events.callback"

local frame_validate = require("resty.events.frame").validate
local client = require("resty.events.protocol").client
local is_timeout = client.is_timeout

local frame_validate = require("resty.events.frame").validate

local type = type
local assert = assert
local setmetatable = setmetatable
Expand Down Expand Up @@ -65,22 +64,28 @@ local function random_delay()
return random(10, 50) / 1000
end

local function do_event(self, d)
self._callback:do_event(d)
local function communicate(premature, self)
if premature then
return
end

self:communicate()
end

local function start_timer(self, delay)
assert(timer_at(delay, function(premature)
self:communicate(premature)
end))
assert(timer_at(delay, communicate, self))
end

local function terminating(self)
return self._connected ~= true or exiting()
end

local check_sock_exist
do
local ffi = require "ffi"
local C = ffi.C
ffi.cdef [[
int access(const char *pathname, int mode);
int access(const char *pathname, int mode);
]]

-- remove prefix 'unix:'
Expand All @@ -94,8 +99,8 @@ function _M.new(opts)
local max_queue_len = opts.max_queue_len

local self = {
_pub_queue = que.new(max_queue_len),
_sub_queue = que.new(max_queue_len),
_pub_queue = queue.new(max_queue_len),
_sub_queue = queue.new(max_queue_len),
_callback = callback.new(),
_connected = nil,
_opts = opts,
Expand All @@ -104,12 +109,106 @@ function _M.new(opts)
return setmetatable(self, _MT)
end

function _M:communicate(premature)
if premature then
-- worker wants to exit
return
end
local function read_thread(self, broker_connection)
while not terminating(self) do
local data, err = broker_connection:recv_frame()
if err then
if not is_timeout(err) then
return nil, "failed to read event: " .. err
end

-- timeout
goto continue
end

if not data then
if not exiting() then
log(ERR, "did not receive event from broker")
end
goto continue
end

local event_data, err = decode(data)
if err then
if not exiting() then
log(ERR, "failed to decode event data: ", err)
end
goto continue
end

-- got an event data, push to queue, callback in events_thread
local _, err = self._sub_queue:push(event_data)
if err then
if not exiting() then
log(ERR, "failed to store event: ", err, ". data is: ",
cjson_encode(event_data))
end
goto continue
end

::continue::
end -- while not terminating

return true
end

local function write_thread(self, broker_connection)
local counter = 0

while not terminating(self) do
local payload, err = self._pub_queue:pop()
if err then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

local _, err = broker_connection:send_frame(payload)
if err then
return nil, "failed to send event: " .. err
end

-- events rate limiting
counter = counter + 1
if counter >= EVENTS_COUNT_LIMIT then
sleep(EVENTS_SLEEP_TIME)
counter = 0
end

::continue::
end -- while not terminating

return true
end

local function events_thread(self)
while not terminating(self) do
local data, err = self._sub_queue:pop()
if err then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

-- got an event data, callback
self._callback:do_event(data)

-- yield, not block other threads
sleep(0)

::continue::
end -- while not terminating

return true
end

function _M:communicate()
-- only for testing, skip read/write/events threads
if self._opts.testing == true then
self._connected = true
Expand All @@ -126,9 +225,9 @@ function _M:communicate(premature)
return
end

local conn = assert(client:new())
local broker_connection = assert(client.new())

local ok, err = conn:connect(listening)
local ok, err = broker_connection:connect(listening)

if exiting() then
return
Expand All @@ -143,118 +242,22 @@ function _M:communicate(premature)
return
end

self._connected = true
log(DEBUG, _worker_id, " on (", listening, ") is ready")

local read_thread = spawn(function()
while not exiting() do
local data, err = conn:recv_frame()

if exiting() then
return
end

if err then
if not is_timeout(err) then
return nil, err
end

-- timeout
goto continue
end

if not data then
return nil, "did not receive event from broker"
end

local d, err = decode(data)
if not d then
return nil, "failed to decode event data: " .. err
end

-- got an event data, push to queue, callback in events_thread
local ok, err = self._sub_queue:push(d)
if not ok then
log(ERR, "failed to store event: ", err, ". ",
"data is :", cjson_encode(d))
end

::continue::
end -- while not exiting
end) -- read_thread

local write_thread = spawn(function()
local counter = 0

while not exiting() do
local payload, err = self._pub_queue:pop()

if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

if exiting() then
return
end

local _, err = conn:send_frame(payload)
if err then
log(ERR, "failed to send event: ", err)
return
end

-- events rate limiting
counter = counter + 1
if counter >= EVENTS_COUNT_LIMIT then
sleep(EVENTS_SLEEP_TIME)
counter = 0
end

::continue::
end -- while not exiting
end) -- write_thread

local events_thread = spawn(function()
while not exiting() do
local data, err = self._sub_queue:pop()

if not data then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

if exiting() then
return
end

-- got an event data, callback
do_event(self, data)

-- yield, not block other threads
sleep(0)

::continue::
end -- while not exiting
end) -- events_thread
self._connected = true

local ok, err, perr = wait(write_thread, read_thread, events_thread)
local read_thread_co = spawn(read_thread, self, broker_connection)
local write_thread_co = spawn(write_thread, self, broker_connection)
local events_thread_co = spawn(events_thread, self)

kill(write_thread)
kill(read_thread)
kill(events_thread)
local ok, err, perr = wait(read_thread_co, write_thread_co, events_thread_co)

self._connected = nil

if exiting() then
kill(read_thread_co)
kill(write_thread_co)
kill(events_thread_co)
return
end

Expand All @@ -266,6 +269,10 @@ function _M:communicate(premature)
log(ERR, "event worker failed: ", perr)
end

wait(read_thread_co)
wait(write_thread_co)
wait(events_thread_co)

start_timer(self, random_delay())
end

Expand Down Expand Up @@ -335,7 +342,7 @@ function _M:publish(target, source, event, data)
if self._opts.testing == true then
log(DEBUG, "event published to 1 workers")

do_event(self, {
self._callback:do_event({
source = source,
event = event,
data = data,
Expand Down Expand Up @@ -368,7 +375,7 @@ end
function _M:subscribe(source, event, callback)
assert(type(source) == "string" and source ~= "", "source is required")
assert(type(event) == "string" and event ~= "", "event is required")
assert(type(callback) == "function", "expected function, got: "..
assert(type(callback) == "function", "expected function, got: " ..
type(callback))

return self._callback:subscribe(source, event, callback)
Expand Down
Loading

0 comments on commit d3bfdd0

Please sign in to comment.