Skip to content

Commit

Permalink
[Fix puma#3282] idle-timeout not waiting on all workers in cluster …
Browse files Browse the repository at this point in the history
…mode
  • Loading branch information
joshuay03 committed Nov 28, 2023
1 parent 32e011a commit d7e69fa
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 35 deletions.
40 changes: 30 additions & 10 deletions lib/puma/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def all_workers_in_phase?
@workers.all? { |w| w.phase == @phase }
end

def all_workers_idle_timed_out?
(@workers.map(&:pid) - idle_timed_out_worker_pids).empty?
end

def check_workers
return if @next_check >= Time.now

Expand Down Expand Up @@ -417,6 +421,8 @@ def run

@master_read, @worker_write = read, @wakeup

@idle_workers = {}

@config.run_hooks(:before_fork, nil, @log_writer)

spawn_workers
Expand All @@ -432,6 +438,12 @@ def run

while @status == :run
begin
if idle_timed_out = all_workers_idle_timed_out?
log "- All workers reached idle timeout"
Process.kill "SIGTERM", Process.pid
break
end

if @phased_restart
start_phased_restart
@phased_restart = false
Expand All @@ -442,33 +454,33 @@ def run
check_workers

if read.wait_readable([0, @next_check - Time.now].max)
req = read.read_nonblock(1)
req = read.read_nonblock(2)

@next_check = Time.now if req == "!"
next if !req || req == "!"
@next_check = Time.now if req == "!!"
next if !req || req == "!!"

result = read.gets
pid = result.to_i

if req == "b" || req == "f"
if req == "bt" || req == "fk"
pid, idx = result.split(':').map(&:to_i)
w = worker_at idx
w.pid = pid if w.pid.nil?
end

if w = @workers.find { |x| x.pid == pid }
case req
when "b"
when "bt"
w.boot!
log "- Worker #{w.index} (PID: #{pid}) booted in #{w.uptime.round(2)}s, phase: #{w.phase}"
@next_check = Time.now
workers_not_booted -= 1
when "e"
when "ex"
# external term, see worker method, Signal.trap "SIGTERM"
w.term!
when "t"
when "tm"
w.term unless w.term?
when "p"
when "pi"
status = result.sub(/^\d+/,'').chomp
w.ping!(status)
@events.fire(:ping!, w)
Expand All @@ -483,23 +495,27 @@ def run
debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug?
booted = true
end
when "i1"
@idle_workers[pid] = true
when "i0"
@idle_workers.delete pid
end
else
log "! Out-of-sync worker list, no #{pid} worker"
end
end

if in_phased_restart && workers_not_booted.zero?
@events.fire_on_booted!
debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug?
in_phased_restart = false
end

rescue Interrupt
@status = :stop
end
end

stop_workers unless @status == :halt
stop_workers unless @status == :halt || idle_timed_out
ensure
@check_pipe.close
@suicide_pipe.close
Expand Down Expand Up @@ -581,5 +597,9 @@ def timeout_workers
end
end
end

def idle_timed_out_worker_pids
@idle_workers.keys
end
end
end
9 changes: 4 additions & 5 deletions lib/puma/cluster/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def initialize(index:, master:, launcher:, pipes:, server: nil)

@index = index
@master = master
@pipes = pipes
@check_pipe = pipes[:check_pipe]
@worker_write = pipes[:worker_write]
@fork_pipe = pipes[:fork_pipe]
Expand Down Expand Up @@ -92,7 +93,7 @@ def run
restart_server << true << false
else # fork worker
worker_pids << pid = spawn_worker(idx)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
@worker_write << "fk#{pid}:#{idx}\n" rescue nil
end
end
end
Expand All @@ -106,7 +107,7 @@ def run
end

begin
@worker_write << "b#{Process.pid}:#{index}\n"
@worker_write << "bt#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Puma::Util.purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
Expand All @@ -122,7 +123,7 @@ def run

stat_thread ||= Thread.new(@worker_write) do |io|
Puma.set_thread_name "stat pld"
base_payload = "p#{Process.pid}"
base_payload = "pi#{Process.pid}"

while true
begin
Expand All @@ -146,8 +147,6 @@ def run
# Invoke any worker shutdown hooks so they can prevent the worker
# exiting until any background operations are completed
@config.run_hooks(:before_worker_shutdown, index, @log_writer, @hook_data)

Process.kill "SIGTERM", master if server.idle_timeout_reached
ensure
@worker_write << "t#{Process.pid}\n" rescue nil
@worker_write.close
Expand Down
4 changes: 2 additions & 2 deletions lib/puma/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def initialize(launcher)
def wakeup!
return unless @wakeup

@wakeup.write "!" unless @wakeup.closed?
@wakeup.write "!!" unless @wakeup.closed?

rescue SystemCallError, IOError
Puma::Util.purge_interrupt_queue
Expand Down Expand Up @@ -174,7 +174,7 @@ def app
end

def start_server
server = Puma::Server.new(app, @events, @options)
server = Puma::Server.new(app, @events, @pipes, @options)
server.inherit_binder(@launcher.binder)
server
end
Expand Down
19 changes: 12 additions & 7 deletions lib/puma/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Server
attr_reader :events
attr_reader :min_threads, :max_threads # for #stats
attr_reader :requests_count # @version 5.0.0
attr_reader :idle_timeout_reached

# @todo the following may be deprecated in the future
attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
Expand Down Expand Up @@ -66,9 +65,10 @@ class Server
# Often `options` needs to be passed, but `events` does not. Using nil allows
# calling code to not require events.rb.
#
def initialize(app, events = nil, options = {})
def initialize(app, events = nil, pipes = nil, options = {})
@app = app
@events = events || Events.new
@worker_write = pipes[:worker_write] if pipes

@check, @notify = nil
@status = :stop
Expand All @@ -82,6 +82,7 @@ def initialize(app, events = nil, options = {})
UserFileDefaultOptions.new(options, Configuration::DEFAULTS)
end

@clustered = (@options.fetch :workers, 0) > 0
@log_writer = @options.fetch :log_writer, LogWriter.stdio
@early_hints = @options[:early_hints]
@first_data_timeout = @options[:first_data_timeout]
Expand Down Expand Up @@ -118,8 +119,6 @@ def initialize(app, events = nil, options = {})
@precheck_closing = true

@requests_count = 0

@idle_timeout_reached = false
end

def inherit_binder(bind)
Expand Down Expand Up @@ -331,12 +330,18 @@ def handle_servers
begin
ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : @idle_timeout)
unless ios
unless shutting_down?
@idle_timeout_reached = true
if shutting_down?
break
elsif @clustered
@worker_write << "i1#{Process.pid}\n" rescue nil
next
else
@status = :stop
end
end

break
if @clustered
@worker_write << "i0#{Process.pid}\n" rescue nil
end

ios.first.each do |sock|
Expand Down
13 changes: 2 additions & 11 deletions test/test_integration_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,8 @@ def test_worker_timeout
end

def test_idle_timeout
cli_server "-w #{workers} test/rackup/hello.ru", config: "idle_timeout 1"

get_worker_pids # wait for workers to boot

connect

sleep 1.15

assert_raises Errno::ECONNREFUSED, "Connection refused" do
connect
end
# TODO: Assert that the cluster shuts down when
# idle_timeout is reached on all workers.
end

def test_worker_index_is_with_in_options_limit
Expand Down

0 comments on commit d7e69fa

Please sign in to comment.