Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(protocol): send worker info (id and pid) to broker #54

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion lualib/resty/events/protocol.lua
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
local frame = require "resty.events.frame"
local codec = require "resty.events.codec"

local _recv_frame = frame.recv
local _send_frame = frame.send
local encode = codec.encode
local decode = codec.decode

local ngx = ngx
local worker_id = ngx.worker.id
local worker_pid = ngx.worker.pid
local tcp = ngx.socket.tcp
local re_match = ngx.re.match
local req_sock = ngx.req.socket
Expand All @@ -18,6 +23,10 @@

-- for high traffic pressure
local DEFAULT_TIMEOUT = 5000 -- 5000ms
local WORKER_INFO = {
id = 0,
pid = 0,
}

local function is_timeout(err)
return err and str_sub(err, -7) == "timeout"
Expand Down Expand Up @@ -83,12 +92,22 @@

sock:settimeout(DEFAULT_TIMEOUT)

local data, err = _recv_frame(sock)
if err then
return nil, "failed to read worker info: " .. err
end

local info, err = decode(data)
if err then
return nil, "invalid worker info received: " .. err
end

return setmetatable({
info = info,
sock = sock,
}, _SERVER_MT)
end


local _Client = {
is_closed = is_closed,
is_timeout = is_timeout,
Expand Down Expand Up @@ -153,6 +172,14 @@
end
end -- subsystem == "http"

WORKER_INFO.id = worker_id() or -1
WORKER_INFO.pid = worker_pid()

local _, err = _send_frame(sock, encode(WORKER_INFO))
if err then
return nil, "failed to send worker info: " .. err
end

return true
end

Expand Down
109 changes: 109 additions & 0 deletions t/protocol-privileged.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:
use Test::Nginx::Socket::Lua;

#worker_connections(1014);
master_process_enabled(1);
#log_level('warn');

repeat_each(2);

plan tests => repeat_each() * (blocks() * 11);

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

#no_diff();
#no_long_string();
#master_on();
#workers(2);
check_accum_error_log();
run_tests();

__DATA__

=== TEST 1: sanity: send_frame, recv_frame (with privileged agent)
--- http_config
lua_package_path "../lua-resty-core/lib/?.lua;lualib/?.lua;;";
init_by_lua_block {
local process = require "ngx.process"
process.enable_privileged_agent(100)
}
init_worker_by_lua_block {
local process = require "ngx.process"
if process.type() ~= "privileged agent" then
return
end
ngx.timer.at(0, function()
local conn = require("resty.events.protocol").client.new()

local ok, err = conn:connect("unix:$TEST_NGINX_HTML_DIR/nginx.sock")
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end

local bytes, err = conn:send_frame("hello")
if err then
ngx.log(ngx.ERR, "failed to send data: ", err)
end

local data, err = conn:recv_frame()
if not data or err then
ngx.log(ngx.ERR, "failed to recv data: ", err)
return
end

ngx.log(ngx.DEBUG, data)
ngx.log(ngx.DEBUG, "cli recv len: ", #data)
end)
}
server {
listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
location / {
content_by_lua_block {
local conn, err = require("resty.events.protocol").server.new()
if not conn then
ngx.say("failed to init socket: ", err)
return
end

ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade)
ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
ngx.say("failed to recv data: ", err)
return
end
ngx.log(ngx.DEBUG, "srv recv data: ", data)

local bytes, err = conn:send_frame("world")
if err then
ngx.say("failed to send data: ", err)
return
end

ngx.log(ngx.DEBUG, "srv send data: world")
}
}
}
--- config
location = /test {
content_by_lua_block {
ngx.say("world")
}
}
--- request
GET /test
--- response_body
world
--- error_log
Upgrade: Kong-Worker-Events/1
Worker ID: -1
srv recv data: hello
srv send data: world
world
cli recv len: 5
--- no_error_log
[error]
[crit]
[alert]
4 changes: 3 additions & 1 deletion t/protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 6;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -33,6 +33,7 @@ __DATA__
end

ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade)
ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
Expand Down Expand Up @@ -84,6 +85,7 @@ GET /test
world
--- error_log
Upgrade: Kong-Worker-Events/1
Worker ID: 0
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down
107 changes: 107 additions & 0 deletions t/stream-protocol-privileged.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# vim:set ft= ts=4 sw=4 et fdm=marker:
use Test::Nginx::Socket::Lua;

#worker_connections(1014);
master_process_enabled(1);
#log_level('warn');

repeat_each(2);

plan tests => repeat_each() * (blocks() * 10);

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

#no_diff();
#no_long_string();
#master_on();
#workers(2);
check_accum_error_log();
run_tests();

__DATA__

=== TEST 1: sanity: send_frame, recv_frame (with privileged agent)
--- main_config
stream {
lua_package_path "../lua-resty-core/lib/?.lua;lualib/?.lua;;";
init_by_lua_block {
local process = require "ngx.process"
process.enable_privileged_agent(100)
}
init_worker_by_lua_block {
local process = require "ngx.process"
if process.type() ~= "privileged agent" then
return
end
ngx.timer.at(0, function()
local conn = require("resty.events.protocol").client.new()

local ok, err = conn:connect("unix:$TEST_NGINX_HTML_DIR/nginx.sock")
if not ok then
ngx.log(ngx.ERR, "failed to connect: ", err)
return
end

local bytes, err = conn:send_frame("hello")
if err then
ngx.log(ngx.ERR, "failed to send data: ", err)
end

local data, err = conn:recv_frame()
if not data or err then
ngx.log(ngx.ERR, "failed to recv data: ", err)
return
end

ngx.log(ngx.DEBUG, data)
ngx.log(ngx.DEBUG, "cli recv len: ", #data)
end)
}
server {
listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
content_by_lua_block {
local conn, err = require("resty.events.protocol").server.new()
if not conn then
ngx.say("failed to init socket: ", err)
return
end

ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)

local data, err = conn:recv_frame()
if not data or err then
ngx.say("failed to recv data: ", err)
return
end
ngx.log(ngx.DEBUG, "srv recv data: ", data)

local bytes, err = conn:send_frame("world")
if err then
ngx.say("failed to send data: ", err)
return
end

ngx.log(ngx.DEBUG, "srv send data: world")
}
}
}
--- config
location = /test {
content_by_lua_block {
ngx.say("world")
}
}
--- request
GET /test
--- response_body
world
--- error_log
Worker ID: -1
srv recv data: hello
srv send data: world
world
cli recv len: 5
--- no_error_log
[error]
[crit]
[alert]
5 changes: 4 additions & 1 deletion t/stream-protocol.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua;

repeat_each(2);

plan tests => repeat_each() * (blocks() * 9) - 8;
plan tests => repeat_each() * (blocks() * 9) - 4;

$ENV{TEST_NGINX_HTML_DIR} ||= html_dir();

Expand All @@ -32,6 +32,7 @@ __DATA__
return
end

ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id)
ngx.log(ngx.DEBUG, "stream connect ok")

local data, err = conn:recv_frame()
Expand Down Expand Up @@ -106,6 +107,8 @@ GET /test
--- response_body
world
--- error_log
Worker ID: 0
stream connect ok
srv recv data: hello
srv send data: world
cli recv len: 5
Expand Down
Loading