Skip to content

Commit

Permalink
feat(protocol): send worker id to broker
Browse files Browse the repository at this point in the history
### Summary

This is a feature that sends worker info to broker. This feature is needed
for future PRs to implement worker id based broker queues (to replace
worker connection based queues).
  • Loading branch information
bungle committed Jun 18, 2024
1 parent c2fdfd2 commit e649ea0
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 3 deletions.
29 changes: 28 additions & 1 deletion lualib/resty/events/protocol.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
local frame = require "resty.events.frame"
local codec = require "resty.events.codec"

local _recv_frame = frame.recv
local _send_frame = frame.send
local encode = codec.encode
local decode = codec.decode

local ngx = ngx
local worker_id = ngx.worker.id
local worker_pid = ngx.worker.pid
local tcp = ngx.socket.tcp
local re_match = ngx.re.match
local req_sock = ngx.req.socket
Expand All @@ -18,6 +23,10 @@ local setmetatable = setmetatable

-- for high traffic pressure
local DEFAULT_TIMEOUT = 5000 -- 5000ms
local WORKER_INFO = {
id = 0,
pid = 0,
}

local function is_timeout(err)
return err and str_sub(err, -7) == "timeout"
Expand Down Expand Up @@ -83,12 +92,22 @@ function _Server.new()

sock:settimeout(DEFAULT_TIMEOUT)

local data, err = _recv_frame(sock)
if err then
return nil, "failed to read worker info: " .. err
end

local info, err = decode(data)
if err then
return nil, "invalid worker info received: " .. err
end

return setmetatable({
info = info,
sock = sock,
}, _SERVER_MT)
end


local _Client = {
is_closed = is_closed,
is_timeout = is_timeout,
Expand Down Expand Up @@ -153,6 +172,14 @@ function _Client:connect(addr)
end
end -- subsystem == "http"

WORKER_INFO.id = worker_id() or -1
WORKER_INFO.pid = worker_pid()

local _, err = _send_frame(sock, encode(WORKER_INFO))
if err then
return nil, "failed to send worker info: " .. err
end

return true
end

Expand Down
109 changes: 109 additions & 0 deletions t/protocol-privileged.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:
use Test::Nginx::Socket::Lua;

#worker_connections(1014);
master_process_enabled(1);
#log_level('warn');

repeat_each(2);

plan tests => repeat_each() * (blocks() * 11);

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

#no_diff();
#no_long_string();
#master_on();
#workers(2);
check_accum_error_log();
run_tests();

__DATA__

=== TEST 1: sanity: send_frame, recv_frame (with privileged agent)
--- http_config
lua_package_path "../lua-resty-core/lib/?.lua;lualib/?.lua;;";
init_by_lua_block {
local process = require "ngx.process"
process.enable_privileged_agent(100)
}
init_worker_by_lua_block {
local process = require "ngx.process"
if process.type() ~= "privileged agent" then
return
end
ngx.timer.at(0, function()
local conn = require("resty.events.protocol").client.new()

local ok, err = conn:connect("unix:$TEST_NGINX_HTML_DIR/nginx.sock")
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end

local bytes, err = conn:send_frame("hello")
if err then
ngx.log(ngx.ERR, "failed to send data: ", err)
end

local data, err = conn:recv_frame()
if not data or err then
ngx.log(ngx.ERR, "failed to recv data: ", err)
return
end

ngx.log(ngx.DEBUG, data)
ngx.log(ngx.DEBUG, "cli recv len: ", #data)
end)
}
server {
listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
location / {
content_by_lua_block {
local conn, err = require("resty.events.protocol").server.new()
if not conn then
ngx.say("failed to init socket: ", err)
return
end

ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade)
ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
ngx.say("failed to recv data: ", err)
return
end
ngx.log(ngx.DEBUG, "srv recv data: ", data)

local bytes, err = conn:send_frame("world")
if err then
ngx.say("failed to send data: ", err)
return
end

ngx.log(ngx.DEBUG, "srv send data: world")
}
}
}
--- config
location = /test {
content_by_lua_block {
ngx.say("world")
}
}
--- request
GET /test
--- response_body
world
--- error_log
Upgrade: Kong-Worker-Events/1
Worker ID: -1
srv recv data: hello
srv send data: world
world
cli recv len: 5
--- no_error_log
[error]
[crit]
[alert]
4 changes: 3 additions & 1 deletion t/protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 6;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -33,6 +33,7 @@ __DATA__
end

ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade)
ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
Expand Down Expand Up @@ -84,6 +85,7 @@ GET /test
world
--- error_log
Upgrade: Kong-Worker-Events/1
Worker ID: 0
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down
107 changes: 107 additions & 0 deletions t/stream-protocol-privileged.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:
use Test::Nginx::Socket::Lua;

#worker_connections(1014);
master_process_enabled(1);
#log_level('warn');

repeat_each(2);

plan tests => repeat_each() * (blocks() * 10);

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

#no_diff();
#no_long_string();
#master_on();
#workers(2);
check_accum_error_log();
run_tests();

__DATA__

=== TEST 1: sanity: send_frame, recv_frame (with privileged agent)
--- main_config
stream {
lua_package_path "../lua-resty-core/lib/?.lua;lualib/?.lua;;";
init_by_lua_block {
local process = require "ngx.process"
process.enable_privileged_agent(100)
}
init_worker_by_lua_block {
local process = require "ngx.process"
if process.type() ~= "privileged agent" then
return
end
ngx.timer.at(0, function()
local conn = require("resty.events.protocol").client.new()

local ok, err = conn:connect("unix:$TEST_NGINX_HTML_DIR/nginx.sock")
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end

local bytes, err = conn:send_frame("hello")
if err then
ngx.log(ngx.ERR, "failed to send data: ", err)
end

local data, err = conn:recv_frame()
if not data or err then
ngx.log(ngx.ERR, "failed to recv data: ", err)
return
end

ngx.log(ngx.DEBUG, data)
ngx.log(ngx.DEBUG, "cli recv len: ", #data)
end)
}
server {
listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
content_by_lua_block {
local conn, err = require("resty.events.protocol").server.new()
if not conn then
ngx.say("failed to init socket: ", err)
return
end

ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
ngx.say("failed to recv data: ", err)
return
end
ngx.log(ngx.DEBUG, "srv recv data: ", data)

local bytes, err = conn:send_frame("world")
if err then
ngx.say("failed to send data: ", err)
return
end

ngx.log(ngx.DEBUG, "srv send data: world")
}
}
}
--- config
location = /test {
content_by_lua_block {
ngx.say("world")
}
}
--- request
GET /test
--- response_body
world
--- error_log
Worker ID: -1
srv recv data: hello
srv send data: world
world
cli recv len: 5
--- no_error_log
[error]
[crit]
[alert]
5 changes: 4 additions & 1 deletion t/stream-protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 8;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -32,6 +32,7 @@ __DATA__
return
end

ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)
ngx.log(ngx.DEBUG, "stream connect ok")

local data, err = conn:recv_frame()
Expand Down Expand Up @@ -106,6 +107,8 @@ GET /test
--- response_body
world
--- error_log
Worker ID: 0
stream connect ok
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down

0 comments on commit e649ea0

Please sign in to comment.