Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker): do not premature kill threads when not exiting #47

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 131 additions & 124 deletions lualib/resty/events/worker.lua
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
local cjson = require "cjson.safe"
local codec = require "resty.events.codec"
local que = require "resty.events.queue"
local queue = require "resty.events.queue"
local callback = require "resty.events.callback"

local frame_validate = require("resty.events.frame").validate
local client = require("resty.events.protocol").client
local is_timeout = client.is_timeout

local frame_validate = require("resty.events.frame").validate

local type = type
local assert = assert
local setmetatable = setmetatable
Expand Down Expand Up @@ -65,22 +64,28 @@ local function random_delay()
return random(10, 50) / 1000
end

local function do_event(self, d)
self._callback:do_event(d)
local function communicate(premature, self)
if premature then
return
end

self:communicate()
end

local function start_timer(self, delay)
assert(timer_at(delay, function(premature)
self:communicate(premature)
end))
assert(timer_at(delay, communicate, self))
end

local function terminating(self)
return not self._connected or exiting()
end

local check_sock_exist
do
local ffi = require "ffi"
local C = ffi.C
ffi.cdef [[
int access(const char *pathname, int mode);
int access(const char *pathname, int mode);
]]

-- remove prefix 'unix:'
Expand All @@ -94,8 +99,8 @@ function _M.new(opts)
local max_queue_len = opts.max_queue_len

local self = {
_pub_queue = que.new(max_queue_len),
_sub_queue = que.new(max_queue_len),
_pub_queue = queue.new(max_queue_len),
_sub_queue = queue.new(max_queue_len),
_callback = callback.new(),
_connected = nil,
_opts = opts,
Expand All @@ -104,12 +109,106 @@ function _M.new(opts)
return setmetatable(self, _MT)
end

function _M:communicate(premature)
if premature then
-- worker wants to exit
return
end
local function read_thread(self, broker_connection)
while not terminating(self) do
local data, err = broker_connection:recv_frame()
if err then
if not is_timeout(err) then
return nil, "failed to read event: " .. err
end

-- timeout
goto continue
end

if not data then
if not exiting() then
log(ERR, "did not receive event from broker")
end
goto continue
end

local event_data, err = decode(data)
if err then
if not exiting() then
log(ERR, "failed to decode event data: ", err)
end
goto continue
end

-- got an event data, push to queue, callback in events_thread
local _, err = self._sub_queue:push(event_data)
if err then
if not exiting() then
log(ERR, "failed to store event: ", err, ". data is: ",
cjson_encode(event_data))
end
goto continue
end

::continue::
end -- while not terminating

return true
end

local function write_thread(self, broker_connection)
local counter = 0

while not terminating(self) do
local payload, err = self._pub_queue:pop()
if err then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

local _, err = broker_connection:send_frame(payload)
if err then
return nil, "failed to send event: " .. err
end

-- events rate limiting
counter = counter + 1
if counter >= EVENTS_COUNT_LIMIT then
sleep(EVENTS_SLEEP_TIME)
counter = 0
end

::continue::
end -- while not terminating

return true
end

local function events_thread(self)
while not terminating(self) do
local data, err = self._sub_queue:pop()
if err then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

-- got an event data, callback
self._callback:do_event(data)

-- yield, not block other threads
sleep(0)

::continue::
end -- while not terminating

return true
end

function _M:communicate()
-- only for testing, skip read/write/events threads
if self._opts.testing == true then
self._connected = true
Expand All @@ -126,9 +225,9 @@ function _M:communicate(premature)
return
end

local conn = assert(client:new())
local broker_connection = assert(client.new())

local ok, err = conn:connect(listening)
local ok, err = broker_connection:connect(listening)

if exiting() then
return
Expand All @@ -143,118 +242,22 @@ function _M:communicate(premature)
return
end

self._connected = true
log(DEBUG, _worker_id, " on (", listening, ") is ready")

local read_thread = spawn(function()
while not exiting() do
local data, err = conn:recv_frame()

if exiting() then
return
end

if err then
if not is_timeout(err) then
return nil, err
end

-- timeout
goto continue
end

if not data then
return nil, "did not receive event from broker"
end

local d, err = decode(data)
if not d then
return nil, "failed to decode event data: " .. err
end

-- got an event data, push to queue, callback in events_thread
local ok, err = self._sub_queue:push(d)
if not ok then
log(ERR, "failed to store event: ", err, ". ",
"data is :", cjson_encode(d))
end

::continue::
end -- while not exiting
end) -- read_thread

local write_thread = spawn(function()
local counter = 0

while not exiting() do
local payload, err = self._pub_queue:pop()

if not payload then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

if exiting() then
return
end

local _, err = conn:send_frame(payload)
if err then
log(ERR, "failed to send event: ", err)
return
end

-- events rate limiting
counter = counter + 1
if counter >= EVENTS_COUNT_LIMIT then
sleep(EVENTS_SLEEP_TIME)
counter = 0
end

::continue::
end -- while not exiting
end) -- write_thread

local events_thread = spawn(function()
while not exiting() do
local data, err = self._sub_queue:pop()

if not data then
if not is_timeout(err) then
return nil, "semaphore wait error: " .. err
end

-- timeout
goto continue
end

if exiting() then
return
end

-- got an event data, callback
do_event(self, data)

-- yield, not block other threads
sleep(0)

::continue::
end -- while not exiting
end) -- events_thread
self._connected = true

local ok, err, perr = wait(write_thread, read_thread, events_thread)
local read_thread_co = spawn(read_thread, self, broker_connection)
local write_thread_co = spawn(write_thread, self, broker_connection)
local events_thread_co = spawn(events_thread, self)

kill(write_thread)
kill(read_thread)
kill(events_thread)
local ok, err, perr = wait(read_thread_co, write_thread_co, events_thread_co)

self._connected = nil

if exiting() then
kill(read_thread_co)
kill(write_thread_co)
kill(events_thread_co)
return
end

Expand All @@ -266,6 +269,10 @@ function _M:communicate(premature)
log(ERR, "event worker failed: ", perr)
end

wait(read_thread_co)
wait(write_thread_co)
wait(events_thread_co)

start_timer(self, random_delay())
end

Expand Down Expand Up @@ -335,7 +342,7 @@ function _M:publish(target, source, event, data)
if self._opts.testing == true then
log(DEBUG, "event published to 1 workers")

do_event(self, {
self._callback:do_event({
source = source,
event = event,
data = data,
Expand Down Expand Up @@ -368,7 +375,7 @@ end
function _M:subscribe(source, event, callback)
assert(type(source) == "string" and source ~= "", "source is required")
assert(type(event) == "string" and event ~= "", "event is required")
assert(type(callback) == "function", "expected function, got: "..
assert(type(callback) == "function", "expected function, got: " ..
type(callback))

return self._callback:subscribe(source, event, callback)
Expand Down
Loading
Loading