Skip to content

Commit

Permalink
feat(protocol): send worker id to broker
Browse files Browse the repository at this point in the history
### Summary

This is a feature that sends worker id to broker. This feature is needed for
future PRs to implement worker id based broker queues (to replace worker
connection based queues).
  • Loading branch information
bungle committed May 21, 2024
1 parent ab580f7 commit 400ffaa
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
20 changes: 19 additions & 1 deletion lualib/resty/events/protocol.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local _recv_frame = frame.recv
local _send_frame = frame.send

local ngx = ngx
local worker_id = ngx.worker.id
local tcp = ngx.socket.tcp
local re_match = ngx.re.match
local req_sock = ngx.req.socket
Expand All @@ -13,6 +14,8 @@ local flush = ngx.flush
local subsystem = ngx.config.subsystem

local type = type
local tonumber = tonumber
local tostring = tostring
local str_sub = string.sub
local setmetatable = setmetatable

Expand Down Expand Up @@ -82,12 +85,22 @@ function _Server.new(self)

sock:settimeout(DEFAULT_TIMEOUT)

local data, err = _recv_frame(sock)
if err then
return nil, "failed to read worker id from worker: " .. err
end

local worker_id = tonumber(data, 10)
if not worker_id then
return nil, "invalid worker id: " .. data
end

return setmetatable({
worker_id = worker_id,
sock = sock,
}, _SERVER_MT)
end


local _Client = {
_VERSION = "0.1.0",
is_timeout = is_timeout,
Expand Down Expand Up @@ -154,6 +167,11 @@ function _Client.connect(self, addr)
end
end -- subsystem == "http"

local _, err = send_frame(self, tostring(worker_id() or -1))
if err then
return nil, "failed to worker id: " .. err
end

return true
end

Expand Down
6 changes: 3 additions & 3 deletions t/protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 6;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -33,6 +33,7 @@ __DATA__
end

ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade)
ngx.log(ngx.DEBUG, "Worker ID: ", conn.worker_id)

local data, err = conn:recv_frame()
if not data or err then
Expand Down Expand Up @@ -84,6 +85,7 @@ GET /test
world
--- error_log
Upgrade: Kong-Worker-Events/1
Worker ID: 0
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down Expand Up @@ -122,5 +124,3 @@ addr is nginx.sock
[error]
[crit]
[alert]


0 comments on commit 400ffaa

Please sign in to comment.