Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

fix(timers) reduce timers used #112

Merged
merged 3 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions spec/balancer/generic_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1662,8 +1662,8 @@ for algorithm, balancer_module in helpers.balancer_types() do


it("recovers when dns entries are replaced by healthy ones", function()
local record = dnsA({
{ name = "getkong.org", address = "1.2.3.4" },
dnsA({
{ name = "getkong.org", address = "1.2.3.4", ttl = 2 },
})
b:addHost("getkong.org", 8000, 50)
assert.not_nil(b:getPeer())
Expand All @@ -1677,11 +1677,10 @@ for algorithm, balancer_module in helpers.balancer_types() do
}
)

-- expire DNS and add a new backend IP
-- update DNS with a new backend IP
-- balancer should now recover since a new healthy backend is available
record.expire = 0
dnsA({
{ name = "getkong.org", address = "5.6.7.8" },
{ name = "getkong.org", address = "5.6.7.8", ttl = 60 },
})

local timeout = ngx.now() + 5 -- we'll try for 5 seconds
Expand Down Expand Up @@ -1873,8 +1872,8 @@ for algorithm, balancer_module in helpers.balancer_types() do
})
b:addHost("127.0.0.1", 8000, 100)

local test_table = setmetatable({}, { __mode = "k" })
test_table[b] = true
local test_table = setmetatable({}, { __mode = "v" })
test_table.key = b
assert.not_nil(next(test_table))

-- destroy it
Expand All @@ -1884,7 +1883,8 @@ for algorithm, balancer_module in helpers.balancer_types() do
collectgarbage()
collectgarbage()
--assert.is_nil(next(test_table)) -- doesn't work, hangs if failed, luassert bug
assert.equal("nil", tostring(next(test_table)))
assert.is_nil(test_table.key)
assert.equal("nil", tostring(test_table.key))
end)

end)
Expand Down
104 changes: 76 additions & 28 deletions src/resty/dns/balancer/base.lua
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,13 @@ local errors = setmetatable({
})


-- global binary heap for all balancers to share as a single update timer for
-- renewing DNS records
local renewal_heap = require("binaryheap").minUnique()
local renewal_weak_cache = setmetatable({}, { __mode = "v" })
local renewal_timer --luacheck: ignore


local _M = {}


Expand Down Expand Up @@ -427,6 +434,55 @@ local function assert_atomicity(f, self, ...)
end


-- Timer invoked to update DNS records
local function resolve_timer_callback()
local now = time()
--print("running timer:",tostring(renewal_heap:peekValue()), " ", now)

while (renewal_heap:peekValue() or math.huge) < now do
local key = renewal_heap:pop()
local host = renewal_weak_cache[key] --- can return nil if GC'ed

--print("timer on: ",key, " the value is: ", tostring((host or EMPTY).hostname))
if host then
ngx_log(ngx_DEBUG, host.balancer.log_prefix, "executing requery for: ", host.hostname)
host:queryDns(false) -- timer-context; cacheOnly always false
end
end
end



-- schedules a DNS update for a host in the global timer queue. This uses only
-- a single timer for all balancers.
-- IMPORTANT: this construct should not prevent GC of the Host object
local function schedule_dns_renewal(host)
local record_expiry = (host.lastQuery or EMPTY).expire or 0
local key = host.balancer.id .. ":" .. host.hostname .. ":" .. host.port

local new_renew_at = record_expiry + 0.05 -- ensure expired, but within stale_ttl
local old_renew_at = renewal_heap:valueByPayload(key)

-- always store the host in the registry, because the same key might be reused
-- by a new host-object for the same hostname in case of quick delete/add sequence
renewal_weak_cache[key] = host

if old_renew_at then
renewal_heap:update(key, new_renew_at)
else
renewal_heap:insert(new_renew_at, key)
end
end


-- remove a Host from the DNS renewal timer
local function cancel_dns_renewal(host)
local key = host.balancer.id .. ":" .. host.hostname .. ":" .. host.port
renewal_weak_cache[key] = nil
renewal_heap:remove(key)
end


local function update_dns_result(self, newQuery, dns)
local oldQuery = self.lastQuery or {}
local oldSorted = self.lastSorted or {}
Expand Down Expand Up @@ -607,6 +663,8 @@ function objHost:queryDns(cacheOnly)

assert_atomicity(update_dns_result, self, newQuery, dns)

schedule_dns_renewal(self)

return true
end

Expand Down Expand Up @@ -727,9 +785,12 @@ function objHost:delete()

for i = #self.addresses, 1, -1 do -- reverse traversal as we're deleting
self.addresses[i]:delete()
self.addresses[i] = nil
end

self.balancer = nil
self.lastQuery = nil
self.lastSorted = nil
end


Expand Down Expand Up @@ -1023,6 +1084,8 @@ function objBalancer:removeHost(hostname, port)

ngx_log(ngx_DEBUG, self.log_prefix, "removing host ", hostname, ":", port)

cancel_dns_renewal(host)

-- set weights to 0
host:disable()

Expand Down Expand Up @@ -1254,20 +1317,6 @@ function objBalancer:setHostStatus(available, hostname, port)
end


-- Timer invoked to update DNS records
function objBalancer:resolveTimerCallback()
--check all hosts for expired records, including those with errors
--we update, so changes on the list while traversing can happen, keep track of that

for _, host in ipairs(self.hosts) do
if ((host.lastQuery or EMPTY).expire or 0) < time() then
ngx_log(ngx_DEBUG, self.log_prefix, "executing requery for: ", host.hostname)
host:queryDns(false) -- timer-context; cacheOnly always false
end
end
end


--- Sets an event callback for user code. The callback is invoked for
-- every address added to/removed from the balancer, and on health changes.
--
Expand Down Expand Up @@ -1417,6 +1466,7 @@ _M.new = function(opts)
balancer_id_counter = balancer_id_counter + 1
local self = {
-- properties
id = balancer_id_counter,
log_prefix = "[" .. (opts.log_prefix or "balancer") .. " " .. tostring(balancer_id_counter) .. "] ",
hosts = {}, -- a list a host objects
addresses = {}, -- a list of addresses, including reverse lookup
Expand All @@ -1435,24 +1485,22 @@ _M.new = function(opts)

self:setCallback(opts.callback or function() end) -- callback for address mutations

do
local err
self.resolveTimer, err = resty_timer({
recurring = true,
interval = 1, -- check for expired records every 1 second
detached = false,
expire = self.resolveTimerCallback,
}, self)

if not self.resolveTimer then
return nil, "failed to create timer for background DNS resolution: " .. err
end
end

ngx_log(ngx_DEBUG, self.log_prefix, "balancer_base created")
return self
end

-- start global renewal timer
renewal_timer = assert(resty_timer({
recurring = true,
interval = 1,
detached = false, -- not anchored, so reloading GC's timer for test purposes
expire = resolve_timer_callback,
}))
-- LuaJIT is smart enough to collect module level vars that are not referenced
-- anymore, hence we add it to the module table as a workaround
_M._renewal_timer = renewal_timer


-- export the error constants
_M.errors = errors
objBalancer.errors = errors
Expand Down