diff --git a/lualib/resty/events/protocol.lua b/lualib/resty/events/protocol.lua index 638ce84..3375ea6 100644 --- a/lualib/resty/events/protocol.lua +++ b/lualib/resty/events/protocol.lua @@ -1,9 +1,14 @@ local frame = require "resty.events.frame" +local codec = require "resty.events.codec" local _recv_frame = frame.recv local _send_frame = frame.send +local encode = codec.encode +local decode = codec.decode local ngx = ngx +local worker_id = ngx.worker.id +local worker_pid = ngx.worker.pid local tcp = ngx.socket.tcp local re_match = ngx.re.match local req_sock = ngx.req.socket @@ -83,12 +88,22 @@ function _Server.new() sock:settimeout(DEFAULT_TIMEOUT) + local data, err = _recv_frame(sock) + if err then + return nil, "failed to read worker info from worker: " .. err + end + + local info = decode(data) + if not info then + return nil, "invalid worker info: " .. data + end + return setmetatable({ + info = info, sock = sock, }, _SERVER_MT) end - local _Client = { is_closed = is_closed, is_timeout = is_timeout, @@ -153,6 +168,14 @@ function _Client:connect(addr) end end -- subsystem == "http" + local _, err = _send_frame(sock, encode({ + id = worker_id() or -1, + pid = worker_pid(), + })) + if err then + return nil, "failed to send worker info: " .. err + end + return true end diff --git a/t/protocol.t b/t/protocol.t index cb24ced..abe8522 100644 --- a/t/protocol.t +++ b/t/protocol.t @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua; repeat_each(2); -plan tests => repeat_each() * (blocks() * 9) - 6; +plan tests => repeat_each() * (blocks() * 9) - 4; $ENV{TEST_NGINX_HTML_DIR} ||= html_dir(); @@ -33,6 +33,7 @@ __DATA__ end ngx.log(ngx.DEBUG, "Upgrade: ", ngx.var.http_upgrade) + ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id) local data, err = conn:recv_frame() if not data or err then @@ -84,6 +85,7 @@ GET /test world --- error_log Upgrade: Kong-Worker-Events/1 +Worker ID: 0 srv recv data: hello srv send data: world cli recv len: 5 @@ -122,5 +124,3 @@ addr is nginx.sock [error] [crit] [alert] - - diff --git a/t/stream-protocol.t b/t/stream-protocol.t index 8b5a734..756269b 100644 --- a/t/stream-protocol.t +++ b/t/stream-protocol.t @@ -7,7 +7,7 @@ use Test::Nginx::Socket::Lua; repeat_each(2); -plan tests => repeat_each() * (blocks() * 9) - 8; +plan tests => repeat_each() * (blocks() * 9) - 4; $ENV{TEST_NGINX_HTML_DIR} ||= html_dir(); @@ -32,6 +32,7 @@ __DATA__ return end + ngx.log(ngx.DEBUG, "Worker ID: ", conn.info.id) ngx.log(ngx.DEBUG, "stream connect ok") local data, err = conn:recv_frame() @@ -106,6 +107,8 @@ GET /test --- response_body world --- error_log +Worker ID: 0 +stream connect ok srv recv data: hello srv send data: world cli recv len: 5 @@ -168,5 +171,3 @@ addr is nginx.sock [error] [crit] [alert] - -