Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use per-scheduler stack pools (let's recycle) #14100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "crystal/system/event_loop"
require "crystal/system/print_error"
require "./fiber_channel"
require "fiber"
require "fiber/stack_pool"
require "crystal/system/thread"

# :nodoc:
Expand All @@ -13,6 +14,11 @@ require "crystal/system/thread"
# protected and must never be called directly.
class Crystal::Scheduler
@event_loop = Crystal::EventLoop.create
@stack_pool = Fiber::StackPool.new

def self.stack_pool : Fiber::StackPool
Thread.current.scheduler.@stack_pool
end

def self.event_loop
Thread.current.scheduler.@event_loop
Expand Down Expand Up @@ -83,15 +89,8 @@ class Crystal::Scheduler
{% end %}
end

{% if flag?(:preview_mt) %}
def self.enqueue_free_stack(stack : Void*) : Nil
Thread.current.scheduler.enqueue_free_stack(stack)
end
{% end %}

{% if flag?(:preview_mt) %}
private getter(fiber_channel : Crystal::FiberChannel) { Crystal::FiberChannel.new }
@free_stacks = Deque(Void*).new
{% end %}

@main : Fiber
Expand Down Expand Up @@ -157,18 +156,6 @@ class Crystal::Scheduler
exit 1
end

{% if flag?(:preview_mt) %}
protected def enqueue_free_stack(stack)
@free_stacks.push stack
end

private def release_free_stacks
while stack = @free_stacks.shift?
Fiber.stack_pool.release stack
end
end
{% end %}

protected def reschedule : Nil
loop do
if runnable = @lock.sync { @runnables.shift? }
Expand All @@ -178,10 +165,6 @@ class Crystal::Scheduler
@event_loop.run_once
end
end

{% if flag?(:preview_mt) %}
release_free_stacks
{% end %}
end

protected def sleep(time : Time::Span) : Nil
Expand All @@ -207,6 +190,8 @@ class Crystal::Scheduler
end

def run_loop
spawn_stack_pool_collector

fiber_channel = self.fiber_channel
loop do
@lock.lock
Expand Down Expand Up @@ -239,7 +224,7 @@ class Crystal::Scheduler
@lock.unlock
end

def self.init_workers
def self.init : Nil
count = worker_count
pending = Atomic(Int32).new(count - 1)
@@workers = Array(Thread).new(count) do |i|
Expand Down Expand Up @@ -281,5 +266,18 @@ class Crystal::Scheduler
4
end
end
{% else %}
def self.init : Nil
{% unless flag?(:interpreted) %}
Thread.current.scheduler.spawn_stack_pool_collector
{% end %}
end
{% end %}

# Background loop to cleanup unused fiber stacks.
def spawn_stack_pool_collector
fiber = Fiber.new(name: "Stack pool collector", &->@stack_pool.collect_loop)
{% if flag?(:preview_mt) %} fiber.set_current_thread {% end %}
enqueue(fiber)
end
end
18 changes: 4 additions & 14 deletions src/fiber.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require "crystal/system/thread_linked_list"
require "./fiber/context"
require "./fiber/stack_pool"

# :nodoc:
@[NoInline]
Expand Down Expand Up @@ -47,9 +46,6 @@ class Fiber
# :nodoc:
protected class_getter(fibers) { Thread::LinkedList(Fiber).new }

# :nodoc:
class_getter stack_pool = StackPool.new

@context : Context
@stack : Void*
@resume_event : Crystal::EventLoop::Event?
Expand Down Expand Up @@ -89,10 +85,9 @@ class Fiber
@context = Context.new
@stack, @stack_bottom =
{% if flag?(:interpreted) %}
# For interpreted mode we don't need a new stack, the stack is held by the interpreter
{Pointer(Void).null, Pointer(Void).null}
{% else %}
Fiber.stack_pool.checkout
Crystal::Scheduler.stack_pool.checkout
{% end %}

fiber_main = ->(f : Fiber) { f.run }
Expand Down Expand Up @@ -153,14 +148,6 @@ class Fiber
ex.inspect_with_backtrace(STDERR)
STDERR.flush
ensure
{% if flag?(:preview_mt) %}
Crystal::Scheduler.enqueue_free_stack @stack
{% elsif flag?(:interpreted) %}
# For interpreted mode we don't need a new stack, the stack is held by the interpreter
{% else %}
Fiber.stack_pool.release(@stack)
{% end %}

# Remove the current fiber from the linked list
Fiber.inactive(self)

Expand All @@ -170,6 +157,9 @@ class Fiber
@timeout_select_action = nil

@alive = false
{% unless flag?(:interpreted) %}
Crystal::Scheduler.stack_pool.release(@stack)
{% end %}
Crystal::Scheduler.reschedule
end

Expand Down
16 changes: 11 additions & 5 deletions src/fiber/stack_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,42 @@ class Fiber

def initialize
@deque = Deque(Void*).new
@mutex = Thread::Mutex.new
end

# Removes and frees at most *count* stacks from the top of the pool,
# returning memory to the operating system.
def collect(count = lazy_size // 2) : Nil
count.times do
if stack = @mutex.synchronize { @deque.shift? }
if stack = @deque.shift?
Crystal::System::Fiber.free_stack(stack, STACK_SIZE)
else
return
end
end
end

def collect_loop(every = 5.seconds) : Nil
loop do
sleep every
collect
end
end

# Removes a stack from the bottom of the pool, or allocates a new one.
def checkout : {Void*, Void*}
stack = @mutex.synchronize { @deque.pop? } || Crystal::System::Fiber.allocate_stack(STACK_SIZE)
stack = @deque.pop? || Crystal::System::Fiber.allocate_stack(STACK_SIZE)
{stack, stack + STACK_SIZE}
end

# Appends a stack to the bottom of the pool.
def release(stack) : Nil
@mutex.synchronize { @deque.push(stack) }
@deque.push(stack)
end

# Returns the approximated size of the pool. It may be equal or slightly
# bigger or smaller than the actual size.
def lazy_size : Int32
@mutex.synchronize { @deque.size }
@deque.size
end
end
end
12 changes: 1 addition & 11 deletions src/kernel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -563,14 +563,6 @@ end
{% end %}

{% unless flag?(:interpreted) || flag?(:wasm32) %}
# Background loop to cleanup unused fiber stacks.
spawn(name: "Fiber Clean Loop") do
loop do
sleep 5
Fiber.stack_pool.collect
end
end

{% if flag?(:win32) %}
Crystal::System::Process.start_interrupt_loop
{% else %}
Expand All @@ -586,7 +578,5 @@ end
Exception::CallStack.load_debug_info if ENV["CRYSTAL_LOAD_DEBUG_INFO"]? == "1"
Exception::CallStack.setup_crash_handler

{% if flag?(:preview_mt) %}
Crystal::Scheduler.init_workers
{% end %}
Crystal::Scheduler.init
{% end %}