From 0b11e3e1860531ca6b8138aa03691a0925d9a334 Mon Sep 17 00:00:00 2001 From: Joshua Young Date: Tue, 28 Nov 2023 15:44:08 +1000 Subject: [PATCH] [Fix #3282] `idle-timeout` not waiting on all workers in cluster mode --- lib/puma/cluster.rb | 37 ++++++++++++++++++++------- lib/puma/cluster/worker.rb | 13 +++++----- lib/puma/dsl.rb | 11 +++++++- lib/puma/runner.rb | 6 ++--- lib/puma/server.rb | 19 +++++++++----- test/test_busy_worker.rb | 2 +- test/test_integration_cluster.rb | 12 ++++++--- test/test_integration_ssl.rb | 2 +- test/test_log_writer.rb | 2 +- test/test_out_of_band_server.rb | 2 +- test/test_persistent.rb | 2 +- test/test_puma_localhost_authority.rb | 4 +-- test/test_puma_server.rb | 8 +++--- test/test_puma_server_hijack.rb | 2 +- test/test_puma_server_ssl.rb | 8 +++--- test/test_request_invalid.rb | 2 +- test/test_response_header.rb | 2 +- test/test_web_server.rb | 2 +- 18 files changed, 88 insertions(+), 48 deletions(-) diff --git a/lib/puma/cluster.rb b/lib/puma/cluster.rb index 59fabb95b3..9604e31bf0 100644 --- a/lib/puma/cluster.rb +++ b/lib/puma/cluster.rb @@ -158,6 +158,10 @@ def all_workers_in_phase? @workers.all? { |w| w.phase == @phase } end + def all_workers_idle_timed_out? + (@workers.map(&:pid) - idle_timed_out_worker_pids).empty? + end + def check_workers return if @next_check >= Time.now @@ -417,6 +421,8 @@ def run @master_read, @worker_write = read, @wakeup + @idle_workers = {} + @config.run_hooks(:before_fork, nil, @log_writer) spawn_workers @@ -432,6 +438,11 @@ def run while @status == :run begin + if idle_timed_out = all_workers_idle_timed_out? + log "- All workers reached idle timeout" + break + end + if @phased_restart start_phased_restart @phased_restart = false @@ -442,15 +453,15 @@ def run check_workers if read.wait_readable([0, @next_check - Time.now].max) - req = read.read_nonblock(1) + req = read.read_nonblock(2) - @next_check = Time.now if req == "!" - next if !req || req == "!" + @next_check = Time.now if req == "!!" + next if !req || req == "!!" result = read.gets pid = result.to_i - if req == "b" || req == "f" + if req == "bt" || req == "fk" pid, idx = result.split(':').map(&:to_i) w = worker_at idx w.pid = pid if w.pid.nil? @@ -458,17 +469,17 @@ def run if w = @workers.find { |x| x.pid == pid } case req - when "b" + when "bt" w.boot! log "- Worker #{w.index} (PID: #{pid}) booted in #{w.uptime.round(2)}s, phase: #{w.phase}" @next_check = Time.now workers_not_booted -= 1 - when "e" + when "ex" # external term, see worker method, Signal.trap "SIGTERM" w.term! - when "t" + when "tm" w.term unless w.term? - when "p" + when "pi" status = result.sub(/^\d+/,'').chomp w.ping!(status) @events.fire(:ping!, w) @@ -483,17 +494,21 @@ def run debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug? booted = true end + when "i1" + @idle_workers[pid] = true + when "i0" + @idle_workers.delete pid end else log "! Out-of-sync worker list, no #{pid} worker" end end + if in_phased_restart && workers_not_booted.zero? @events.fire_on_booted! debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug? in_phased_restart = false end - rescue Interrupt @status = :stop end @@ -581,5 +596,9 @@ def timeout_workers end end end + + def idle_timed_out_worker_pids + @idle_workers.keys + end end end diff --git a/lib/puma/cluster/worker.rb b/lib/puma/cluster/worker.rb index face057687..a3e54ce83f 100644 --- a/lib/puma/cluster/worker.rb +++ b/lib/puma/cluster/worker.rb @@ -19,6 +19,7 @@ def initialize(index:, master:, launcher:, pipes:, server: nil) @index = index @master = master + @pipes = pipes @check_pipe = pipes[:check_pipe] @worker_write = pipes[:worker_write] @fork_pipe = pipes[:fork_pipe] @@ -92,21 +93,21 @@ def run restart_server << true << false else # fork worker worker_pids << pid = spawn_worker(idx) - @worker_write << "f#{pid}:#{idx}\n" rescue nil + @worker_write << "fk#{pid}:#{idx}\n" rescue nil end end end end Signal.trap "SIGTERM" do - @worker_write << "e#{Process.pid}\n" rescue nil + @worker_write << "ex#{Process.pid}\n" rescue nil restart_server.clear server.stop restart_server << false end begin - @worker_write << "b#{Process.pid}:#{index}\n" + @worker_write << "bt#{Process.pid}:#{index}\n" rescue SystemCallError, IOError Puma::Util.purge_interrupt_queue STDERR.puts "Master seems to have exited, exiting." @@ -122,7 +123,7 @@ def run stat_thread ||= Thread.new(@worker_write) do |io| Puma.set_thread_name "stat pld" - base_payload = "p#{Process.pid}" + base_payload = "pi#{Process.pid}" while true begin @@ -146,10 +147,8 @@ def run # Invoke any worker shutdown hooks so they can prevent the worker # exiting until any background operations are completed @config.run_hooks(:before_worker_shutdown, index, @log_writer, @hook_data) - - Process.kill "SIGTERM", master if server.idle_timeout_reached ensure - @worker_write << "t#{Process.pid}\n" rescue nil + @worker_write << "tm#{Process.pid}\n" rescue nil @worker_write.close end diff --git a/lib/puma/dsl.rb b/lib/puma/dsl.rb index 57c699045b..436862bd89 100644 --- a/lib/puma/dsl.rb +++ b/lib/puma/dsl.rb @@ -330,7 +330,16 @@ def persistent_timeout(seconds) # If a new request is not received within this number of seconds, begin shutting down. # @see Puma::Server.new def idle_timeout(seconds) - @options[:idle_timeout] = Integer(seconds) + timeout = Integer(seconds) + + if (@options.fetch(:workers, 0)) > 0 + min = @options.fetch(:worker_check_interval, Configuration::DEFAULTS[:worker_check_interval]) * 2 + if timeout < min + raise "The minimum idle_timeout must at least twice the worker reporting interval (#{min})" + end + end + + @options[:idle_timeout] = timeout end # Work around leaky apps that leave garbage in Thread locals diff --git a/lib/puma/runner.rb b/lib/puma/runner.rb index a22c5b08a2..2b407649f0 100644 --- a/lib/puma/runner.rb +++ b/lib/puma/runner.rb @@ -27,7 +27,7 @@ def initialize(launcher) def wakeup! return unless @wakeup - @wakeup.write "!" unless @wakeup.closed? + @wakeup.write "!!" unless @wakeup.closed? rescue SystemCallError, IOError Puma::Util.purge_interrupt_queue @@ -72,7 +72,7 @@ def start_control # A Reactor is not created and nio4r is not loaded when 'queue_requests: false' # Use `nil` for events, no hooks in control server - control = Puma::Server.new app, nil, + control = Puma::Server.new app, nil, nil, { min_threads: 0, max_threads: 1, queue_requests: false, log_writer: @log_writer } begin @@ -174,7 +174,7 @@ def app end def start_server - server = Puma::Server.new(app, @events, @options) + server = Puma::Server.new(app, @events, @pipes, @options) server.inherit_binder(@launcher.binder) server end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index e282fdd182..91df49d0a0 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -37,7 +37,6 @@ class Server attr_reader :events attr_reader :min_threads, :max_threads # for #stats attr_reader :requests_count # @version 5.0.0 - attr_reader :idle_timeout_reached # @todo the following may be deprecated in the future attr_reader :auto_trim_time, :early_hints, :first_data_timeout, @@ -66,9 +65,10 @@ class Server # Often `options` needs to be passed, but `events` does not. Using nil allows # calling code to not require events.rb. # - def initialize(app, events = nil, options = {}) + def initialize(app, events = nil, pipes = nil, options = {}) @app = app @events = events || Events.new + @worker_write = pipes[:worker_write] if pipes @check, @notify = nil @status = :stop @@ -82,6 +82,7 @@ def initialize(app, events = nil, options = {}) UserFileDefaultOptions.new(options, Configuration::DEFAULTS) end + @clustered = (@options.fetch :workers, 0) > 0 @log_writer = @options.fetch :log_writer, LogWriter.stdio @early_hints = @options[:early_hints] @first_data_timeout = @options[:first_data_timeout] @@ -118,8 +119,6 @@ def initialize(app, events = nil, options = {}) @precheck_closing = true @requests_count = 0 - - @idle_timeout_reached = false end def inherit_binder(bind) @@ -332,13 +331,21 @@ def handle_servers ios = IO.select sockets, nil, nil, (shutting_down? ? 0 : @idle_timeout) unless ios unless shutting_down? - @idle_timeout_reached = true - @status = :stop + if @clustered + @worker_write << "i1#{Process.pid}\n" rescue nil + next + else + @status = :stop + end end break end + if @clustered + @worker_write << "i0#{Process.pid}\n" rescue nil + end + ios.first.each do |sock| if sock == check break if handle_check diff --git a/test/test_busy_worker.rb b/test/test_busy_worker.rb index 11973a5198..3907aed094 100644 --- a/test/test_busy_worker.rb +++ b/test/test_busy_worker.rb @@ -56,7 +56,7 @@ def with_server(**options, &app) options[:max_threads] ||= 10 options[:log_writer] ||= Puma::LogWriter.strings - @server = Puma::Server.new request_handler, nil, **options + @server = Puma::Server.new request_handler, nil, nil, **options @port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1] @server.run end diff --git a/test/test_integration_cluster.rb b/test/test_integration_cluster.rb index 57dd7613bd..52ee68b9a4 100644 --- a/test/test_integration_cluster.rb +++ b/test/test_integration_cluster.rb @@ -222,13 +222,19 @@ def test_worker_timeout end def test_idle_timeout - cli_server "-w #{workers} test/rackup/hello.ru", config: "idle_timeout 1" + cli_server "-w #{workers} test/rackup/hello.ru", config: <<~RUBY + worker_check_interval 1 + idle_timeout 2 + RUBY get_worker_pids # wait for workers to boot - connect + 10.times { + fast_connect + sleep 1 + } - sleep 1.15 + sleep 3 assert_raises Errno::ECONNREFUSED, "Connection refused" do connect diff --git a/test/test_integration_ssl.rb b/test/test_integration_ssl.rb index 88917fe7f7..7f58aa4f37 100644 --- a/test/test_integration_ssl.rb +++ b/test/test_integration_ssl.rb @@ -162,7 +162,7 @@ def test_ssl_run_with_curl_client app = lambda { |_| [200, { 'Content-Type' => 'text/plain' }, ["HELLO", ' ', "THERE"]] } opts = {max_threads: 1} - server = Puma::Server.new app, nil, opts + server = Puma::Server.new app, nil, nil, opts if Puma.jruby? ssl_params = { 'keystore' => File.expand_path('../examples/puma/client-certs/keystore.jks', __dir__), diff --git a/test/test_log_writer.rb b/test/test_log_writer.rb index f62bfd2bdb..8248486a98 100644 --- a/test/test_log_writer.rb +++ b/test/test_log_writer.rb @@ -125,7 +125,7 @@ def test_custom_log_formatter def test_parse_error app = proc { |_env| [200, {"Content-Type" => "plain/text"}, ["hello\n"]] } log_writer = Puma::LogWriter.strings - server = Puma::Server.new app, nil, {log_writer: log_writer} + server = Puma::Server.new app, nil, nil, {log_writer: log_writer} host = '127.0.0.1' port = (server.add_tcp_listener host, 0).addr[1] diff --git a/test/test_out_of_band_server.rb b/test/test_out_of_band_server.rb index 5f20b02da2..7c3315eb17 100644 --- a/test/test_out_of_band_server.rb +++ b/test/test_out_of_band_server.rb @@ -70,7 +70,7 @@ def oob_server(**options) options[:max_threads] ||= 1 options[:log_writer] ||= Puma::LogWriter.strings - @server = Puma::Server.new app, nil, out_of_band: [oob], **options + @server = Puma::Server.new app, nil, nil, out_of_band: [oob], **options @port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1] @server.run sleep 0.15 if Puma.jruby? diff --git a/test/test_persistent.rb b/test/test_persistent.rb index 08568ef318..f56fe256f2 100644 --- a/test/test_persistent.rb +++ b/test/test_persistent.rb @@ -24,7 +24,7 @@ def setup end opts = {max_threads: 1} - @server = Puma::Server.new @simple, nil, opts + @server = Puma::Server.new @simple, nil, nil, opts @port = (@server.add_tcp_listener HOST, 0).addr[1] @server.run sleep 0.15 if Puma.jruby? diff --git a/test/test_puma_localhost_authority.rb b/test/test_puma_localhost_authority.rb index bac39a1b00..c37b3866e5 100644 --- a/test/test_puma_localhost_authority.rb +++ b/test/test_puma_localhost_authority.rb @@ -28,7 +28,7 @@ def start_server app = lambda { |env| [200, {}, [env['rack.url_scheme']]] } @log_writer = SSLLogWriterHelper.new STDOUT, STDERR - @server = Puma::Server.new app, nil, {log_writer: @log_writer} + @server = Puma::Server.new app, nil, nil, {log_writer: @log_writer} @server.add_ssl_listener LOCALHOST, 0, nil @bind_port = @server.connected_ports[0] @server.run @@ -54,7 +54,7 @@ def test_self_signed_by_localhost_authority @log_writer = SSLLogWriterHelper.new STDOUT, STDERR - @server = Puma::Server.new app, nil, {log_writer: @log_writer} + @server = Puma::Server.new app, nil, nil, {log_writer: @log_writer} @server.app = app @server.add_ssl_listener LOCALHOST, 0, nil diff --git a/test/test_puma_server.rb b/test/test_puma_server.rb index 339f295d72..03b66e6c83 100644 --- a/test/test_puma_server.rb +++ b/test/test_puma_server.rb @@ -24,7 +24,7 @@ def setup @log_writer = Puma::LogWriter.strings @events = Puma::Events.new - @server = Puma::Server.new @app, @events, {log_writer: @log_writer} + @server = Puma::Server.new @app, @events, nil, {log_writer: @log_writer} end def teardown @@ -44,7 +44,7 @@ def teardown def server_run(**options, &block) options[:log_writer] ||= @log_writer options[:min_threads] ||= 1 - @server = Puma::Server.new block || @app, @events, options + @server = Puma::Server.new block || @app, @events, nil, options @port = (@server.add_tcp_listener @host, 0).addr[1] @server.run end @@ -630,7 +630,7 @@ def test_no_timeout_after_data_received end def test_no_timeout_after_data_received_no_queue - @server = Puma::Server.new @app, @events, {log_writer: @log_writer, queue_requests: false} + @server = Puma::Server.new @app, @events, nil, {log_writer: @log_writer, queue_requests: false} test_no_timeout_after_data_received end @@ -1566,7 +1566,7 @@ def test_command_ignored_before_run def test_custom_io_selector backend = NIO::Selector.backends.first - @server = Puma::Server.new @app, @events, {log_writer: @log_writer, :io_selector_backend => backend} + @server = Puma::Server.new @app, @events, nil, {log_writer: @log_writer, :io_selector_backend => backend} @server.run selector = @server.instance_variable_get(:@reactor).instance_variable_get(:@selector) diff --git a/test/test_puma_server_hijack.rb b/test/test_puma_server_hijack.rb index 30f48b4354..3744aa028b 100644 --- a/test/test_puma_server_hijack.rb +++ b/test/test_puma_server_hijack.rb @@ -49,7 +49,7 @@ def teardown def server_run(**options, &block) options[:log_writer] ||= @log_writer options[:min_threads] ||= 1 - @server = Puma::Server.new block || @app, @events, options + @server = Puma::Server.new block || @app, @events, nil, options @port = (@server.add_tcp_listener @host, 0).addr[1] @server.run end diff --git a/test/test_puma_server_ssl.rb b/test/test_puma_server_ssl.rb index cfe56c2407..0578e4e65b 100644 --- a/test/test_puma_server_ssl.rb +++ b/test/test_puma_server_ssl.rb @@ -62,7 +62,7 @@ def start_server yield ctx if block_given? @log_writer = SSLLogWriterHelper.new STDOUT, STDERR - @server = Puma::Server.new app, nil, {log_writer: @log_writer} + @server = Puma::Server.new app, nil, nil, {log_writer: @log_writer} @port = (@server.add_ssl_listener HOST, 0, ctx).addr[1] @bind_port = @port @server.run @@ -279,7 +279,7 @@ def assert_ssl_client_error_match(error, subject: nil, context: CTX, &blk) app = lambda { |env| [200, {}, [env['rack.url_scheme']]] } log_writer = SSLLogWriterHelper.new STDOUT, STDERR - server = Puma::Server.new app, nil, {log_writer: log_writer} + server = Puma::Server.new app, nil, nil, {log_writer: log_writer} server.add_ssl_listener LOCALHOST, port, context host_addrs = server.binder.ios.map { |io| io.to_io.addr[2] } @bind_port = server.connected_ports[0] @@ -484,7 +484,7 @@ def test_server_ssl_with_cert_pem_and_key_pem app = lambda { |env| [200, {}, [env['rack.url_scheme']]] } log_writer = SSLLogWriterHelper.new STDOUT, STDERR - server = Puma::Server.new app, nil, {log_writer: log_writer} + server = Puma::Server.new app, nil, nil, {log_writer: log_writer} server.add_ssl_listener LOCALHOST, 0, ctx @bind_port = server.connected_ports[0] server.run @@ -526,7 +526,7 @@ def cert_chain(&blk) app = lambda { |env| [200, {}, [env['rack.url_scheme']]] } @log_writer = SSLLogWriterHelper.new STDOUT, STDERR - @server = Puma::Server.new app, nil, {log_writer: @log_writer} + @server = Puma::Server.new app, nil, nil, {log_writer: @log_writer} mini_ctx = Puma::MiniSSL::Context.new mini_ctx.key = "#{CHAIN_DIR}/cert.key" diff --git a/test/test_request_invalid.rb b/test/test_request_invalid.rb index 6efdbeffb4..97d8e9ff9d 100644 --- a/test/test_request_invalid.rb +++ b/test/test_request_invalid.rb @@ -35,7 +35,7 @@ def setup } @log_writer = Puma::LogWriter.strings - @server = Puma::Server.new app, nil, {log_writer: @log_writer} + @server = Puma::Server.new app, nil, nil, {log_writer: @log_writer} @port = (@server.add_tcp_listener @host, 0).addr[1] @server.run sleep 0.15 if Puma.jruby? diff --git a/test/test_response_header.rb b/test/test_response_header.rb index 2f33c846c6..d167c3bd56 100644 --- a/test/test_response_header.rb +++ b/test/test_response_header.rb @@ -17,7 +17,7 @@ def setup @app = ->(env) { [200, {}, [env['rack.url_scheme']]] } @log_writer = Puma::LogWriter.strings - @server = Puma::Server.new @app, ::Puma::Events.new, {log_writer: @log_writer, min_threads: 1} + @server = Puma::Server.new @app, ::Puma::Events.new, nil, {log_writer: @log_writer, min_threads: 1} end def teardown diff --git a/test/test_web_server.rb b/test/test_web_server.rb index aadcf26f9b..6ba8ef5d63 100644 --- a/test/test_web_server.rb +++ b/test/test_web_server.rb @@ -23,7 +23,7 @@ class WebServerTest < Minitest::Test def setup @tester = TestHandler.new - @server = Puma::Server.new @tester, nil, {log_writer: Puma::LogWriter.strings} + @server = Puma::Server.new @tester, nil, nil, {log_writer: Puma::LogWriter.strings} @port = (@server.add_tcp_listener "127.0.0.1", 0).addr[1] @tcp = "http://127.0.0.1:#{@port}" @server.run