Skip to content

Commit

Permalink
POC | CFS with red-black tree
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuay03 committed Jul 29, 2024
1 parent 62f7cd6 commit c761e46
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ end

gem 'm'
gem "localhost", require: false

gem "red-black-tree", github: 'joshuay03/red-black-tree'
11 changes: 11 additions & 0 deletions lib/puma/client_node.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

require 'red-black-tree'

module Puma
class ClientNode < ::RedBlackTree::Node
def <=>(other)
data.env.fetch("MIN_VRUNTIME", 0) <=> other.data.env.fetch("MIN_VRUNTIME", 0)
end
end
end
1 change: 0 additions & 1 deletion lib/puma/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ def handle_servers
if sock == check
break if handle_check
else
pool.wait_until_not_full
pool.wait_for_less_busy_worker(options[:wait_for_less_busy_worker])

io = begin
Expand Down
18 changes: 10 additions & 8 deletions lib/puma/thread_pool.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# frozen_string_literal: true

require 'thread'
require 'red-black-tree'

require_relative 'io_buffer'
require_relative 'client_node'

module Puma
# Internal Docs for A simple thread pool management object.
Expand All @@ -12,9 +14,9 @@ module Puma
# First a connection to a client is made in `Puma::Server`. It is wrapped in a
# `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure
# the whole request is buffered into memory. Once the request is ready, it is passed into
# a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
# a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` tree.
#
# Each thread in the pool has an internal loop where it pulls a request from the `@todo` array
# Each thread in the pool has an internal loop where it pulls a request from the `@todo` tree
# and processes it.
class ThreadPool
class ForceShutdown < RuntimeError
Expand All @@ -36,7 +38,7 @@ def initialize(name, options = {}, &block)
@not_full = ConditionVariable.new
@mutex = Mutex.new

@todo = []
@todo = ::RedBlackTree.new

@spawned = 0
@waiting = 0
Expand Down Expand Up @@ -144,7 +146,7 @@ def spawn_thread
end
end

work = todo.shift
work = todo.shift.data
end

if @clean_thread_locals
Expand Down Expand Up @@ -219,14 +221,14 @@ def with_mutex(&block)
@mutex.synchronize(&block)
end

# Add +work+ to the todo list for a Thread to pickup and process.
# Add +work+ to the todo tree for a Thread to pickup and process.
def <<(work)
with_mutex do
if @shutdown
raise "Unable to add work while shutting down"
end

@todo << work
@todo << Puma::ClientNode.new(work)

if @waiting < @todo.size and @spawned < @max
spawn_thread
Expand Down Expand Up @@ -255,9 +257,9 @@ def <<(work)
# signaled, usually this indicates that a request has been processed.
#
# It's important to note that even though the server might accept another
# request, it might not be added to the `@todo` array right away.
# request, it might not be added to the `@todo` tree right away.
# For example if a slow client has only sent a header, but not a body
# then the `@todo` array would stay the same size as the reactor works
# then the `@todo` tree would stay the same size as the reactor works
# to try to buffer the request. In that scenario the next call to this
# method would not block and another request would be added into the reactor
# by the server. This would continue until a fully buffered request
Expand Down

0 comments on commit c761e46

Please sign in to comment.