Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster) record and log errors causing nodes to be marked as DOWN #129

Merged
merged 1 commit into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions lib/resty/cassandra/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ end
-----------------------------------------

local function set_peer(self, host, up, reconn_delay, unhealthy_at,
data_center, release_version)
data_center, connect_err, release_version)
data_center = data_center or ''
connect_err = connect_err or ''
release_version = release_version or ''

-- host status
Expand All @@ -53,8 +54,9 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at,
end

-- host info
local marshalled = fmt("%d:%d:%d:%s%s", reconn_delay, unhealthy_at,
#data_center, data_center, release_version)
local marshalled = fmt("%d:%d:%d:%d:%s%s%s", reconn_delay, unhealthy_at,
#data_center, #connect_err, data_center, connect_err,
release_version)

ok, err = self.shm:safe_set(_rec_key..host, marshalled)
if not ok then
Expand All @@ -65,7 +67,7 @@ local function set_peer(self, host, up, reconn_delay, unhealthy_at,
end

local function add_peer(self, host, data_center)
return set_peer(self, host, true, 0, 0, data_center, "")
return set_peer(self, host, true, 0, 0, data_center, nil, nil)
end

local function get_peer(self, host, status)
Expand All @@ -86,23 +88,28 @@ local function get_peer(self, host, status)
local sep_1 = find(marshalled, ":", 1, true)
local sep_2 = find(marshalled, ":", sep_1 + 1, true)
local sep_3 = find(marshalled, ":", sep_2 + 1, true)
local sep_4 = find(marshalled, ":", sep_3 + 1, true)

local reconn_delay = sub(marshalled, 1, sep_1 - 1)
local unhealthy_at = sub(marshalled, sep_1 + 1, sep_2 - 1)
local data_center_len = sub(marshalled, sep_2 + 1, sep_3 - 1)
local err_len = sub(marshalled, sep_3 + 1, sep_4 - 1)

local data_center_last = sep_3 + tonumber(data_center_len)
local data_center_last = sep_4 + tonumber(data_center_len)
local err_last = data_center_last + tonumber(err_len)

local data_center = sub(marshalled, sep_3 + 1, data_center_last)
local release_version = sub(marshalled, data_center_last + 1)
local data_center = sub(marshalled, sep_4 + 1, data_center_last)
local err_conn = sub(marshalled, data_center_last + 1, err_last)
local release_version = sub(marshalled, err_last + 1)

return {
up = status,
host = host,
data_center = data_center ~= '' and data_center or nil,
release_version = release_version ~= '' and release_version or nil,
reconn_delay = tonumber(reconn_delay),
unhealthy_at = tonumber(unhealthy_at)
unhealthy_at = tonumber(unhealthy_at),
err = err_conn,
}
end

Expand Down Expand Up @@ -130,7 +137,7 @@ local function delete_peer(self, host)
self.shm:delete(host) -- status bool
end

local function set_peer_down(self, host)
local function set_peer_down(self, host, connect_err)
if self.logging then
log(WARN, _log_prefix, 'setting host at ', host, ' DOWN')
end
Expand All @@ -139,7 +146,7 @@ local function set_peer_down(self, host)
peer = peer or empty_t -- this can be called from refresh() so no host in shm yet

return set_peer(self, host, false, self.reconn_policy:next_delay(host), get_now(),
peer.data_center, peer.release_version)
peer.data_center, connect_err, peer.release_version)
end

local function set_peer_up(self, host)
Expand All @@ -152,7 +159,7 @@ local function set_peer_up(self, host)
peer = peer or empty_t -- this can be called from refresh() so no host in shm yet

return set_peer(self, host, true, 0, 0,
peer.data_center, peer.release_version)
peer.data_center, nil, peer.release_version)
end

local function can_try_peer(self, host)
Expand All @@ -163,7 +170,8 @@ local function can_try_peer(self, host)
-- reconnection policy steps in before making a decision
local peer_rec, err = get_peer(self, host, up)
if not peer_rec then return nil, err end
return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay, nil, true
return get_now() - peer_rec.unhealthy_at >= peer_rec.reconn_delay,
nil, true, peer_rec
end
end

Expand Down Expand Up @@ -191,7 +199,7 @@ local function check_peer_health(self, host, coordinator_options, retry)
if not peer then return nil, err
else
peer:settimeout(self.timeout_connect)
local ok, err, maybe_down = peer:connect()
local ok, err_conn, maybe_down = peer:connect()
if ok then
-- host is healthy
if retry then
Expand All @@ -205,12 +213,12 @@ local function check_peer_health(self, host, coordinator_options, retry)
return peer
elseif maybe_down then
-- host is not (or still not) responsive
local ok, shm_err = set_peer_down(self, host)
local ok, shm_err = set_peer_down(self, host, err_conn)
if not ok then return nil, 'error setting host down: '..shm_err end

return nil, 'host seems unhealthy, considering it down ('..err..')'
return nil, 'host seems unhealthy, considering it down ('..err_conn..')'
else
return nil, err
return nil, err_conn
end
end
end
Expand Down Expand Up @@ -419,7 +427,7 @@ local function next_coordinator(self, coordinator_options)
local errors = {}

for _, peer_rec in self.lb_policy:iter() do
local ok, err, retry = can_try_peer(self, peer_rec.host)
local ok, err, retry, peer_state = can_try_peer(self, peer_rec.host)
if ok then
local peer, err = check_peer_health(self, peer_rec.host, coordinator_options, retry)
if peer then
Expand All @@ -433,7 +441,19 @@ local function next_coordinator(self, coordinator_options)
elseif err then
return nil, err
else
errors[peer_rec.host] = 'host still considered down'
local s = 'host still considered down'
if peer_state then
local waited = get_now() - peer_state.unhealthy_at
s = s .. ' for ' .. (peer_state.reconn_delay - waited) / 1000 .. 's'

if peer_state.err and peer_state.err ~= '' then
s = s .. ' (last error: ' .. peer_state.err .. ')'
else
s = s .. ' (last error: not recorded)'
end
end

errors[peer_rec.host] = s
end
end

Expand Down Expand Up @@ -540,7 +560,8 @@ function _Cluster:refresh()
end

local ok, err = set_peer(self, host, up, reconn_delay, unhealthy_at,
rows[i].data_center, rows[i].release_version)
rows[i].data_center, nil,
rows[i].release_version)
if not ok then return nil, err end
end
end
Expand Down Expand Up @@ -755,9 +776,9 @@ local function handle_error(self, err, cql_code, coordinator, request)
end
else
-- host seems down?
local ok, err = set_peer_down(self, coordinator.host)
if not ok then return nil, err end
return self:send_retry(request, 'coordinator seems down')
local ok, err2 = set_peer_down(self, coordinator.host, err)
if not ok then return nil, err2 end
return self:send_retry(request, 'coordinator seems down (' .. err .. ')')
end

return nil, err, cql_code
Expand Down
62 changes: 54 additions & 8 deletions t/06-cluster.t
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ init: true
end

-- insert fake peers
cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', '0.0')
cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', '0.0')
cluster:set_peer('127.0.0.253', true, 0, 0, 'foocenter1', nil, '0.0')
cluster:set_peer('127.0.0.254', true, 0, 0, 'foocenter1', nil, '0.0')

local ok, err = cluster:refresh()
if not ok then
Expand Down Expand Up @@ -482,7 +482,7 @@ info: no host details for 127.0.0.254
end

-- insert previous peers with some infos
cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '')
cluster:set_peer('127.0.0.1', false, 1000, 1461030739000, '', '', '')

local ok, err = cluster:refresh()
if not ok then
Expand Down Expand Up @@ -987,6 +987,52 @@ coordinator 3: 127.0.0.1
end
end

ngx.sleep(0.2)

local coordinator, err = cluster:next_coordinator()
if not coordinator then
ngx.say(err)
end
}
}
--- request
GET /t
--- response_body_like chomp
all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)\. 127\.0\.0\.\d+: host still considered down for 0\.[678]\d+s \(last error: not recorded\)
--- no_error_log
[error]



=== TEST 23: next_coordinator() returns no host available errors with recorded errors
--- http_config eval: $::HttpConfig
--- config
location /t {
content_by_lua_block {
local Cluster = require 'resty.cassandra.cluster'
local cluster, err = Cluster.new()
if not cluster then
ngx.log(ngx.ERR, err)
end

local ok, err = cluster:refresh()
if not ok then
ngx.log(ngx.ERR, err)
end

local peers, err = cluster:get_peers()
if not peers then
ngx.log(ngx.ERR, err)
end

for i = 1, #peers do
local ok, err = cluster:set_peer_down(peers[i].host, "timeout")
if not ok then
ngx.log(ngx.ERR, err)
return
end
end

local coordinator, err = cluster:next_coordinator()
if not coordinator then
ngx.say(err)
Expand All @@ -996,13 +1042,13 @@ coordinator 3: 127.0.0.1
--- request
GET /t
--- response_body_like chomp
all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down\. 127\.0\.0\.\d+: host still considered down\. 127\.0\.0\.\d+: host still considered down
all hosts tried for query failed\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)\. 127\.0\.0\.\d+: host still considered down for 1s \(last error: timeout\)
--- no_error_log
[error]



=== TEST 23: next_coordinator() avoids down hosts
=== TEST 24: next_coordinator() avoids down hosts
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -1046,7 +1092,7 @@ GET /t



=== TEST 24: next_coordinator() marks nodes as down
=== TEST 25: next_coordinator() marks nodes as down
--- http_config eval
qq {
lua_socket_log_errors off;
Expand Down Expand Up @@ -1114,7 +1160,7 @@ can try peer 255.255.255.253: false



=== TEST 25: next_coordinator() retries down host as per reconnection policy and ups them back
=== TEST 26: next_coordinator() retries down host as per reconnection policy and ups them back
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down Expand Up @@ -1189,7 +1235,7 @@ GET /t



=== TEST 26: next_coordinator() sets coordinator keyspace on connect
=== TEST 27: next_coordinator() sets coordinator keyspace on connect
--- http_config eval: $::HttpConfig
--- config
location /t {
Expand Down