Skip to content

Commit

Permalink
[Fix puma#3282] idle-timeout not waiting on all workers in cluster …
Browse files Browse the repository at this point in the history
…mode
  • Loading branch information
joshuay03 committed Nov 28, 2023
1 parent 32e011a commit cc2394e
Show file tree
Hide file tree
Showing 17 changed files with 70 additions and 55 deletions.
40 changes: 30 additions & 10 deletions lib/puma/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ def all_workers_in_phase?
@workers.all? { |w| w.phase == @phase }
end

def all_workers_idle_timed_out?
(@workers.map(&:pid) - idle_timed_out_worker_pids).empty?
end

def check_workers
return if @next_check >= Time.now

Expand Down Expand Up @@ -417,6 +421,8 @@ def run

@master_read, @worker_write = read, @wakeup

@idle_workers = {}

@config.run_hooks(:before_fork, nil, @log_writer)

spawn_workers
Expand All @@ -432,6 +438,12 @@ def run

while @status == :run
begin
if idle_timed_out = all_workers_idle_timed_out?
log "- All workers reached idle timeout"
Process.kill "SIGTERM", Process.pid
break
end

if @phased_restart
start_phased_restart
@phased_restart = false
Expand All @@ -442,33 +454,33 @@ def run
check_workers

if read.wait_readable([0, @next_check - Time.now].max)
req = read.read_nonblock(1)
req = read.read_nonblock(2)

@next_check = Time.now if req == "!"
next if !req || req == "!"
@next_check = Time.now if req == "!!"
next if !req || req == "!!"

result = read.gets
pid = result.to_i

if req == "b" || req == "f"
if req == "bt" || req == "fk"
pid, idx = result.split(':').map(&:to_i)
w = worker_at idx
w.pid = pid if w.pid.nil?
end

if w = @workers.find { |x| x.pid == pid }
case req
when "b"
when "bt"
w.boot!
log "- Worker #{w.index} (PID: #{pid}) booted in #{w.uptime.round(2)}s, phase: #{w.phase}"
@next_check = Time.now
workers_not_booted -= 1
when "e"
when "ex"
# external term, see worker method, Signal.trap "SIGTERM"
w.term!
when "t"
when "tm"
w.term unless w.term?
when "p"
when "pi"
status = result.sub(/^\d+/,'').chomp
w.ping!(status)
@events.fire(:ping!, w)
Expand All @@ -483,23 +495,27 @@ def run
debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug?
booted = true
end
when "i1"
@idle_workers[pid] = true
when "i0"
@idle_workers.delete pid
end
else
log "! Out-of-sync worker list, no #{pid} worker"
end
end

if in_phased_restart && workers_not_booted.zero?
@events.fire_on_booted!
debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug?
in_phased_restart = false
end

rescue Interrupt
@status = :stop
end
end

stop_workers unless @status == :halt
stop_workers unless @status == :halt || idle_timed_out
ensure
@check_pipe.close
@suicide_pipe.close
Expand Down Expand Up @@ -581,5 +597,9 @@ def timeout_workers
end
end
end

def idle_timed_out_worker_pids
@idle_workers.keys
end
end
end
9 changes: 4 additions & 5 deletions lib/puma/cluster/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def initialize(index:, master:, launcher:, pipes:, server: nil)

@index = index
@master = master
@pipes = pipes
@check_pipe = pipes[:check_pipe]
@worker_write = pipes[:worker_write]
@fork_pipe = pipes[:fork_pipe]
Expand Down Expand Up @@ -92,7 +93,7 @@ def run
restart_server << true << false
else # fork worker
worker_pids << pid = spawn_worker(idx)
@worker_write << "f#{pid}:#{idx}\n" rescue nil
@worker_write << "fk#{pid}:#{idx}\n" rescue nil
end
end
end
Expand All @@ -106,7 +107,7 @@ def run
end

begin
@worker_write << "b#{Process.pid}:#{index}\n"
@worker_write << "bt#{Process.pid}:#{index}\n"
rescue SystemCallError, IOError
Puma::Util.purge_interrupt_queue
STDERR.puts "Master seems to have exited, exiting."
Expand All @@ -122,7 +123,7 @@ def run

stat_thread ||= Thread.new(@worker_write) do |io|
Puma.set_thread_name "stat pld"
base_payload = "p#{Process.pid}"
base_payload = "pi#{Process.pid}"

while true
begin
Expand All @@ -146,8 +147,6 @@ def run
# Invoke any worker shutdown hooks so they can prevent the worker
# exiting until any background operations are completed
@config.run_hooks(:before_worker_shutdown, index, @log_writer, @hook_data)

Process.kill "SIGTERM", master if server.idle_timeout_reached
ensure
@worker_write << "t#{Process.pid}\n" rescue nil
@worker_write.close
Expand Down
6 changes: 3 additions & 3 deletions lib/puma/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def initialize(launcher)
def wakeup!
return unless @wakeup

@wakeup.write "!" unless @wakeup.closed?
@wakeup.write "!!" unless @wakeup.closed?

rescue SystemCallError, IOError
Puma::Util.purge_interrupt_queue
Expand Down Expand Up @@ -72,7 +72,7 @@ def start_control

# A Reactor is not created and nio4r is not loaded when 'queue_requests: false'
# Use `nil` for events, no hooks in control server
control = Puma::Server.new app, nil,
control = Puma::Server.new app, nil, nil,
{ min_threads: 0, max_threads: 1, queue_requests: false, log_writer: @log_writer }

begin
Expand Down Expand Up @@ -174,7 +174,7 @@ def app
end

def start_server
server = Puma::Server.new(app, @events, @options)
server = Puma::Server.new(app, @events, @pipes, @options)
server.inherit_binder(@launcher.binder)
server
end
Expand Down
19 changes: 12 additions & 7 deletions lib/puma/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class Server
attr_reader :events
attr_reader :min_threads, :max_threads # for #stats
attr_reader :requests_count # @version 5.0.0
attr_reader :idle_timeout_reached

# @todo the following may be deprecated in the future
attr_reader :auto_trim_time, :early_hints, :first_data_timeout,
Expand Down Expand Up @@ -66,9 +65,10 @@ class Server
# Often `options` needs to be passed, but `events` does not. Using nil allows
# calling code to not require events.rb.
#
def initialize(app, events = nil, options = {})
def initialize(app, events = nil, pipes = nil, options = {})
@app = app
@events = events || Events.new
@worker_write = pipes[:worker_write] if pipes

@check, @notify = nil
@status = :stop
Expand All @@ -82,6 +82,7 @@ def initialize(app, events = nil, options = {})
UserFileDefaultOptions.new(options, Configuration::DEFAULTS)
end

@clustered = (@options.fetch :workers, 0) > 0
@log_writer = @options.fetch :log_writer, LogWriter.stdio
@early_hints = @options[:early_hints]
@first_data_timeout = @options[:first_data_timeout]
Expand Down Expand Up @@ -118,8 +119,6 @@ def initialize(app, events = nil, options = {})
@precheck_closing = true

@requests_count = 0

@idle_timeout_reached = false
end

def inherit_binder(bind)
Expand Down Expand Up @@ -331,12 +330,18 @@ def handle_servers
begin
ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : @idle_timeout)
unless ios
unless shutting_down?
@idle_timeout_reached = true
if shutting_down?
break
elsif @clustered
@worker_write << "i1#{Process.pid}\n" rescue nil
next
else
@status = :stop
end
end

break
if @clustered
@worker_write << "i0#{Process.pid}\n" rescue nil
end

ios.first.each do |sock|
Expand Down
2 changes: 1 addition & 1 deletion test/test_busy_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def with_server(**options, &app)
options[:max_threads] ||= 10
options[:log_writer] ||= Puma::LogWriter.strings

@server = Puma::Server.new request_handler, nil, **options
@server = Puma::Server.new request_handler, nil, nil, **options
@port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1]
@server.run
end
Expand Down
13 changes: 2 additions & 11 deletions test/test_integration_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,8 @@ def test_worker_timeout
end

def test_idle_timeout
cli_server "-w #{workers} test/rackup/hello.ru", config: "idle_timeout 1"

get_worker_pids # wait for workers to boot

connect

sleep 1.15

assert_raises Errno::ECONNREFUSED, "Connection refused" do
connect
end
# TODO: Assert that the cluster shuts down when
# idle_timeout is reached on all workers.
end

def test_worker_index_is_with_in_options_limit
Expand Down
2 changes: 1 addition & 1 deletion test/test_integration_ssl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def test_ssl_run_with_curl_client

app = lambda { |_| [200, { 'Content-Type' => 'text/plain' }, ["HELLO", ' ', "THERE"]] }
opts = {max_threads: 1}
server = Puma::Server.new app, nil, opts
server = Puma::Server.new app, nil, nil, opts
if Puma.jruby?
ssl_params = {
'keystore' => File.expand_path('../examples/puma/client-certs/keystore.jks', __dir__),
Expand Down
2 changes: 1 addition & 1 deletion test/test_log_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def test_custom_log_formatter
def test_parse_error
app = proc { |_env| [200, {"Content-Type" => "plain/text"}, ["hello\n"]] }
log_writer = Puma::LogWriter.strings
server = Puma::Server.new app, nil, {log_writer: log_writer}
server = Puma::Server.new app, nil, nil, {log_writer: log_writer}

host = '127.0.0.1'
port = (server.add_tcp_listener host, 0).addr[1]
Expand Down
2 changes: 1 addition & 1 deletion test/test_out_of_band_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def oob_server(**options)
options[:max_threads] ||= 1
options[:log_writer] ||= Puma::LogWriter.strings

@server = Puma::Server.new app, nil, out_of_band: [oob], **options
@server = Puma::Server.new app, nil, nil, out_of_band: [oob], **options
@port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1]
@server.run
sleep 0.15 if Puma.jruby?
Expand Down
2 changes: 1 addition & 1 deletion test/test_persistent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def setup
end

opts = {max_threads: 1}
@server = Puma::Server.new @simple, nil, opts
@server = Puma::Server.new @simple, nil, nil, opts
@port = (@server.add_tcp_listener HOST, 0).addr[1]
@server.run
sleep 0.15 if Puma.jruby?
Expand Down
4 changes: 2 additions & 2 deletions test/test_puma_localhost_authority.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def start_server
app = lambda { |env| [200, {}, [env['rack.url_scheme']]] }

@log_writer = SSLLogWriterHelper.new STDOUT, STDERR
@server = Puma::Server.new app, nil, {log_writer: @log_writer}
@server = Puma::Server.new app, nil, nil, {log_writer: @log_writer}
@server.add_ssl_listener LOCALHOST, 0, nil
@bind_port = @server.connected_ports[0]
@server.run
Expand All @@ -54,7 +54,7 @@ def test_self_signed_by_localhost_authority

@log_writer = SSLLogWriterHelper.new STDOUT, STDERR

@server = Puma::Server.new app, nil, {log_writer: @log_writer}
@server = Puma::Server.new app, nil, nil, {log_writer: @log_writer}
@server.app = app

@server.add_ssl_listener LOCALHOST, 0, nil
Expand Down
8 changes: 4 additions & 4 deletions test/test_puma_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def setup

@log_writer = Puma::LogWriter.strings
@events = Puma::Events.new
@server = Puma::Server.new @app, @events, {log_writer: @log_writer}
@server = Puma::Server.new @app, @events, nil, {log_writer: @log_writer}
end

def teardown
Expand All @@ -44,7 +44,7 @@ def teardown
def server_run(**options, &block)
options[:log_writer] ||= @log_writer
options[:min_threads] ||= 1
@server = Puma::Server.new block || @app, @events, options
@server = Puma::Server.new block || @app, @events, nil, options
@port = (@server.add_tcp_listener @host, 0).addr[1]
@server.run
end
Expand Down Expand Up @@ -630,7 +630,7 @@ def test_no_timeout_after_data_received
end

def test_no_timeout_after_data_received_no_queue
@server = Puma::Server.new @app, @events, {log_writer: @log_writer, queue_requests: false}
@server = Puma::Server.new @app, @events, nil, {log_writer: @log_writer, queue_requests: false}
test_no_timeout_after_data_received
end

Expand Down Expand Up @@ -1566,7 +1566,7 @@ def test_command_ignored_before_run
def test_custom_io_selector
backend = NIO::Selector.backends.first

@server = Puma::Server.new @app, @events, {log_writer: @log_writer, :io_selector_backend => backend}
@server = Puma::Server.new @app, @events, nil, {log_writer: @log_writer, :io_selector_backend => backend}
@server.run

selector = @server.instance_variable_get(:@reactor).instance_variable_get(:@selector)
Expand Down
2 changes: 1 addition & 1 deletion test/test_puma_server_hijack.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def teardown
def server_run(**options, &block)
options[:log_writer] ||= @log_writer
options[:min_threads] ||= 1
@server = Puma::Server.new block || @app, @events, options
@server = Puma::Server.new block || @app, @events, nil, options
@port = (@server.add_tcp_listener @host, 0).addr[1]
@server.run
end
Expand Down
Loading

0 comments on commit cc2394e

Please sign in to comment.