Skip to content

Commit

Permalink
feat(protocol): send worker id to broker
Browse files Browse the repository at this point in the history
### Summary

This is a feature that sends worker info to broker. This feature is needed for
future PRs to implement worker id based broker queues (to replace worker
connection based queues).
  • Loading branch information
bungle committed May 28, 2024
1 parent 21d152d commit 0a9f2bb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 7 deletions.
25 changes: 24 additions & 1 deletion lualib/resty/events/protocol.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
local frame = require "resty.events.frame"
local codec = require "resty.events.codec"

local _recv_frame = frame.recv
local _send_frame = frame.send
local encode = codec.encode
local decode = codec.decode

local ngx = ngx
local worker_id = ngx.worker.id
local worker_pid = ngx.worker.pid
local tcp = ngx.socket.tcp
local re_match = ngx.re.match
local req_sock = ngx.req.socket
Expand Down Expand Up @@ -83,12 +88,22 @@ function _Server.new()

sock:settimeout(DEFAULT_TIMEOUT)

local data, err = _recv_frame(sock)
if err then
return nil, "failed to read worker info from worker: " .. err
end

local info = decode(data)
if not info then
return nil, "invalid worker info: " .. data
end

return setmetatable({
info = info,
sock = sock,
}, _SERVER_MT)
end


local _Client = {
is_closed = is_closed,
is_timeout = is_timeout,
Expand Down Expand Up @@ -153,6 +168,14 @@ function _Client:connect(addr)
end
end -- subsystem == "http"

local _, err = _send_frame(sock, encode({
id = worker_id() or -1,
pid = worker_pid(),
}))
if err then
return nil, "failed to send worker info: " .. err
end

return true
end

Expand Down
6 changes: 3 additions & 3 deletions t/protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 6;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -33,6 +33,7 @@ __DATA__
end

ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade)
ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
Expand Down Expand Up @@ -84,6 +85,7 @@ GET /test
world
--- error_log
Upgrade: Kong-Worker-Events/1
Worker ID: 0
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down Expand Up @@ -122,5 +124,3 @@ addr is nginx.sock
[error]
[crit]
[alert]


7 changes: 4 additions & 3 deletions t/stream-protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 8;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -32,6 +32,7 @@ __DATA__
return
end

ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)
ngx.log(ngx.DEBUG, "stream connect ok")

local data, err = conn:recv_frame()
Expand Down Expand Up @@ -106,6 +107,8 @@ GET /test
--- response_body
world
--- error_log
Worker ID: 0
stream connect ok
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down Expand Up @@ -168,5 +171,3 @@ addr is nginx.sock
[error]
[crit]
[alert]


0 comments on commit 0a9f2bb

Please sign in to comment.