From c761e46938d6f3af3e48d13ffa8e6453a7870e63 Mon Sep 17 00:00:00 2001 From: Joshua Young Date: Tue, 16 Jul 2024 17:48:50 +0530 Subject: [PATCH] POC | CFS with red-black tree --- Gemfile | 2 ++ lib/puma/client_node.rb | 11 +++++++++++ lib/puma/server.rb | 1 - lib/puma/thread_pool.rb | 18 ++++++++++-------- 4 files changed, 23 insertions(+), 9 deletions(-) create mode 100644 lib/puma/client_node.rb diff --git a/Gemfile b/Gemfile index 98ab9a7c36..79705a82b1 100644 --- a/Gemfile +++ b/Gemfile @@ -39,3 +39,5 @@ end gem 'm' gem "localhost", require: false + +gem "red-black-tree", github: 'joshuay03/red-black-tree' diff --git a/lib/puma/client_node.rb b/lib/puma/client_node.rb new file mode 100644 index 0000000000..01b598fb6e --- /dev/null +++ b/lib/puma/client_node.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require 'red-black-tree' + +module Puma + class ClientNode < ::RedBlackTree::Node + def <=>(other) + data.env.fetch("MIN_VRUNTIME", 0) <=> other.data.env.fetch("MIN_VRUNTIME", 0) + end + end +end diff --git a/lib/puma/server.rb b/lib/puma/server.rb index 50e0282fcb..b76801b27d 100644 --- a/lib/puma/server.rb +++ b/lib/puma/server.rb @@ -359,7 +359,6 @@ def handle_servers if sock == check break if handle_check else - pool.wait_until_not_full pool.wait_for_less_busy_worker(options[:wait_for_less_busy_worker]) io = begin diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index cc39bd3e9a..f9c7b83e2a 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -1,8 +1,10 @@ # frozen_string_literal: true require 'thread' +require 'red-black-tree' require_relative 'io_buffer' +require_relative 'client_node' module Puma # Internal Docs for A simple thread pool management object. @@ -12,9 +14,9 @@ module Puma # First a connection to a client is made in `Puma::Server`. It is wrapped in a # `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure # the whole request is buffered into memory. Once the request is ready, it is passed into - # a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array. + # a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` tree. # - # Each thread in the pool has an internal loop where it pulls a request from the `@todo` array + # Each thread in the pool has an internal loop where it pulls a request from the `@todo` tree # and processes it. class ThreadPool class ForceShutdown < RuntimeError @@ -36,7 +38,7 @@ def initialize(name, options = {}, &block) @not_full = ConditionVariable.new @mutex = Mutex.new - @todo = [] + @todo = ::RedBlackTree.new @spawned = 0 @waiting = 0 @@ -144,7 +146,7 @@ def spawn_thread end end - work = todo.shift + work = todo.shift.data end if @clean_thread_locals @@ -219,14 +221,14 @@ def with_mutex(&block) @mutex.synchronize(&block) end - # Add +work+ to the todo list for a Thread to pickup and process. + # Add +work+ to the todo tree for a Thread to pickup and process. def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end - @todo << work + @todo << Puma::ClientNode.new(work) if @waiting < @todo.size and @spawned < @max spawn_thread @@ -255,9 +257,9 @@ def <<(work) # signaled, usually this indicates that a request has been processed. # # It's important to note that even though the server might accept another - # request, it might not be added to the `@todo` array right away. + # request, it might not be added to the `@todo` tree right away. # For example if a slow client has only sent a header, but not a body - # then the `@todo` array would stay the same size as the reactor works + # then the `@todo` tree would stay the same size as the reactor works # to try to buffer the request. In that scenario the next call to this # method would not block and another request would be added into the reactor # by the server. This would continue until a fully buffered request