Skip to content

Commit

Permalink
RFC 2: initialize Fiber with an explicit stack
Browse files Browse the repository at this point in the history
`Fiber::ExecutionContext#spawn` takes advantage of this to take a stack
from the execution context's stack pool, instead of the current
context's stack pool. Cross context spawns thus won't take a stack from
context A but release the stack in context B (where it actually ran)
which would prevent stack recycling.

This also permits to create lots of Fiber instances in specs using a
minimal fake stack, instead of requesting 8MB of virtual memory for each
fiber, despite the fibers never running... which is leaking memory
because we only release the stacks when the fiber terminates.
  • Loading branch information
ysbaddaden committed Feb 4, 2025
1 parent 9455517 commit 2d5afab
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 22 deletions.
16 changes: 8 additions & 8 deletions spec/std/fiber/execution_context/global_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ describe Fiber::ExecutionContext::GlobalQueue do
end

it "#unsafe_push and #unsafe_pop" do
f1 = Fiber.new(name: "f1") { }
f2 = Fiber.new(name: "f2") { }
f3 = Fiber.new(name: "f3") { }
f1 = new_fake_fiber("f1")
f2 = new_fake_fiber("f2")
f3 = new_fake_fiber("f3")

q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
q.unsafe_push(f1)
Expand Down Expand Up @@ -38,7 +38,7 @@ describe Fiber::ExecutionContext::GlobalQueue do

it "grabs fibers" do
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
fibers = 10.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a
fibers = 10.times.map { |i| new_fake_fiber("f#{i}") }.to_a
fibers.each { |f| q.unsafe_push(f) }

runnables = Fiber::ExecutionContext::Runnables(6).new(q)
Expand All @@ -59,7 +59,7 @@ describe Fiber::ExecutionContext::GlobalQueue do
end

it "can't grab more than available" do
f = Fiber.new { }
f = new_fake_fiber
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
q.unsafe_push(f)

Expand All @@ -73,7 +73,7 @@ describe Fiber::ExecutionContext::GlobalQueue do
end

it "clamps divisor to 1" do
f = Fiber.new { }
f = new_fake_fiber
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
q.unsafe_push(f)

Expand All @@ -91,7 +91,7 @@ describe Fiber::ExecutionContext::GlobalQueue do
pending_interpreted describe: "thread safety" do
it "one by one" do
fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 763).new do |i|
Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { })
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
end

n = 7
Expand Down Expand Up @@ -141,7 +141,7 @@ describe Fiber::ExecutionContext::GlobalQueue do
increments = 15

fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5
Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { })
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
end

queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
Expand Down
20 changes: 10 additions & 10 deletions spec/std/fiber/execution_context/runnables_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe Fiber::ExecutionContext::Runnables do

describe "#push" do
it "enqueues the fiber in local queue" do
fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a
fibers = 4.times.map { |i| new_fake_fiber("f#{i}") }.to_a

# local enqueue
g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
Expand All @@ -25,7 +25,7 @@ describe Fiber::ExecutionContext::Runnables do
end

it "moves half the local queue to the global queue on overflow" do
fibers = 5.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a
fibers = 5.times.map { |i| new_fake_fiber("f#{i}") }.to_a

# local enqueue + overflow
g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
Expand All @@ -48,12 +48,12 @@ describe Fiber::ExecutionContext::Runnables do

4.times do
# local
4.times { r.push(Fiber.new { }) }
4.times { r.push(new_fake_fiber) }
2.times { r.shift? }
2.times { r.push(Fiber.new { }) }
2.times { r.push(new_fake_fiber) }

# overflow (2+1 fibers are sent to global queue + 1 local)
2.times { r.push(Fiber.new { }) }
2.times { r.push(new_fake_fiber) }

# clear
3.times { r.shift? }
Expand All @@ -73,7 +73,7 @@ describe Fiber::ExecutionContext::Runnables do
describe "#bulk_push" do
it "fills the local queue" do
l = Fiber::List.new
fibers = 4.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a
fibers = 4.times.map { |i| new_fake_fiber("f#{i}") }.to_a
fibers.each { |f| l.push(f) }

# local enqueue
Expand All @@ -87,7 +87,7 @@ describe Fiber::ExecutionContext::Runnables do

it "pushes the overflow to the global queue" do
l = Fiber::List.new
fibers = 7.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a
fibers = 7.times.map { |i| new_fake_fiber("f#{i}") }.to_a
fibers.each { |f| l.push(f) }

# local enqueue + overflow
Expand Down Expand Up @@ -115,7 +115,7 @@ describe Fiber::ExecutionContext::Runnables do
describe "#steal_from" do
it "steals from another runnables" do
g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
fibers = 6.times.map { |i| Fiber.new(name: "f#{i}") { } }.to_a
fibers = 6.times.map { |i| new_fake_fiber("f#{i}") }.to_a

# fill the source queue
r1 = Fiber::ExecutionContext::Runnables(16).new(g)
Expand Down Expand Up @@ -143,7 +143,7 @@ describe Fiber::ExecutionContext::Runnables do

it "steals the last fiber" do
g = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
lone = Fiber.new(name: "lone") { }
lone = new_fake_fiber("lone")

# fill the source queue
r1 = Fiber::ExecutionContext::Runnables(16).new(g)
Expand Down Expand Up @@ -185,7 +185,7 @@ describe Fiber::ExecutionContext::Runnables do
# less fibers than space in runnables (so threads can starve)
# 54 is roughly half of 16 × 7 and can be divided by 9 (for batch enqueues below)
fibers = Array(Fiber::ExecutionContext::FiberCounter).new(54) do |i|
Fiber::ExecutionContext::FiberCounter.new(Fiber.new(name: "f#{i}") { })
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
end

global_queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
Expand Down
16 changes: 16 additions & 0 deletions spec/std/fiber/execution_context/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@ require "crystal/system/thread_wait_group"
require "fiber/execution_context/runnables"
require "fiber/execution_context/global_queue"

# fake stack for `makecontext` to have somewhere to write in #initialize.
# we don't actually run the fiber.
FAKE_FIBER_STACK = GC.malloc(32)

def new_fake_fiber(name = nil)
stack = FAKE_FIBER_STACK
stack_bottom = FAKE_FIBER_STACK + 32

{% if flag?(:execution_context) %}
execution_context = Fiber::ExecutionContext.current
Fiber.new(name, stack, stack_bottom, execution_context) { }
{% else %}
Fiber.new(name, stack, stack_bottom) { }
{% end %}
end

module Fiber::ExecutionContext
class FiberCounter
def initialize(@fiber : Fiber)
Expand Down
11 changes: 8 additions & 3 deletions src/fiber.cr
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,21 @@ class Fiber
# When the fiber is executed, it runs *proc* in its context.
#
# *name* is an optional and used only as an internal reference.
def initialize(@name : String? = nil, {% if flag?(:execution_context) %}@execution_context : ExecutionContext = ExecutionContext.current,{% end %} &@proc : ->)
@context = Context.new
@stack, @stack_bottom =
def self.new(name : String? = nil, {% if flag?(:execution_context) %}execution_context : ExecutionContext = ExecutionContext.current,{% end %} &proc : ->) : self
stack, stack_bottom =
{% if flag?(:interpreted) %}
{Pointer(Void).null, Pointer(Void).null}
{% elsif flag?(:execution_context) %}
execution_context.stack_pool.checkout
{% else %}
Crystal::Scheduler.stack_pool.checkout
{% end %}
new(name, stack, stack_bottom, {% if flag?(:execution_context) %}execution_context,{% end %} &proc)
end

# :nodoc:
def initialize(@name : String?, @stack : Void*, @stack_bottom : Void*, {% if flag?(:execution_context) %}@execution_context : ExecutionContext,{% end %} &@proc : ->)
@context = Context.new

fiber_main = ->(f : Fiber) { f.run }

Expand Down
10 changes: 9 additions & 1 deletion src/fiber/execution_context.cr
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,15 @@ module Fiber::ExecutionContext
#
# May be called from any `ExecutionContext` (i.e. must be thread-safe).
def spawn(*, name : String? = nil, &block : ->) : Fiber
Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) }
stack, stack_bottom =
{% if flag?(:interpreted) %}
{Pointer(Void).null, Pointer(Void).null}
{% else %}
stack_pool.checkout
{% end %}
fiber = Fiber.new(name, stack, stack_bottom, self, &block)
enqueue(fiber)
fiber
end

# :nodoc:
Expand Down

0 comments on commit 2d5afab

Please sign in to comment.