Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fiber context switch, Stack pools, Safe select #7407

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion spec/std/concurrent/select_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ private def yield_to(fiber)
end

describe "select" do
it "select many receviers" do
it "select many receivers" do
ch1 = Channel(Int32).new
ch2 = Channel(Int32).new
res = [] of Int32
Expand Down Expand Up @@ -72,6 +72,35 @@ describe "select" do
res.should eq (0...10).to_a
end

it "select else clause and cancel other clauses" do
ch1 = Channel::Buffered(Int32).new(1)
ch2 = Channel::Buffered(Int32).new(1)

select
when ch1.receive
got = 1
when ch2.receive
got = 2
else
got = -1
end

got.should eq(-1)

spawn do
ch1.send(1)
ch2.send(2)
ch1.close
ch2.close
end

ch1.receive.should eq(1)
ch1.receive?.should be_nil

ch2.receive.should eq(2)
ch2.receive?.should be_nil
end

it "select should work with send which started before receive, fixed #3862" do
ch1 = Channel(Int32).new
ch2 = Channel(Int32).new
Expand Down Expand Up @@ -100,4 +129,23 @@ describe "select" do
sleep
x.should eq 1
end

it "won't enqueue a dead/running fiber, fixed #3900" do
ch = Channel::Buffered(Int32).new(1)

spawn do
ch.send(1)

select
when ch.send(1)
when ch.send(2)
end

ch.close
end

ch.receive.should eq(1)
ch.receive.should eq(1)
ch.receive?.should be_nil
end
end
94 changes: 71 additions & 23 deletions src/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ require "fiber"

abstract class Channel(T)
module SelectAction
getter? canceled = false
getter? waiting = false

abstract def ready?
abstract def execute
abstract def wait
abstract def unwait
abstract def wait : Bool
abstract def unwait(fiber : Fiber)
end

class ClosedError < Exception
Expand Down Expand Up @@ -61,16 +64,16 @@ abstract class Channel(T)
@receivers << Fiber.current
end

protected def unwait_for_receive
@receivers.delete Fiber.current
protected def unwait_for_receive(fiber)
@receivers.delete fiber
end

protected def wait_for_send
@senders << Fiber.current
end

protected def unwait_for_send
@senders.delete Fiber.current
protected def unwait_for_send(fiber)
@senders.delete fiber
end

protected def raise_if_closed
Expand All @@ -94,26 +97,65 @@ abstract class Channel(T)
nil
end

# :nodoc:
def self.select(*ops : SelectAction)
self.select ops
end

# :nodoc:
#
# Executes all operations inside its own fiber to wait in. Postpones the fiber
# execution so the fibers' array will always be filled with all fibers, and
# any ready operation can cancel all other fibers ASAP.
def self.select(ops : Tuple | Array, has_else = false)
loop do
ops.each_with_index do |op, index|
if op.ready?
result = op.execute
return index, result
end
# fast path: check if any clause is ready
ops.each_with_index do |op, i|
if op.ready?
return {i, op.execute}
end
end

if has_else
return {ops.size, nil}
end

if has_else
return ops.size, nil
# slow path: spawn fibers to wait on each clause
main = Fiber.current
fibers = Array(Fiber).new(ops.size)
index = -1
value = nil

ops.each_with_index do |op, i|
fibers << Fiber.new(name: i.to_s) do
loop do
break if op.canceled?

if op.ready?
# cancel other fibers before executing the op, which could switch
# the current context:
cancel_select_actions(ops, fibers, i)
index, value = i, op.execute
Crystal::Scheduler.enqueue(main)
break
end

op.wait
end
end
end

ops.each &.wait
Crystal::Scheduler.reschedule
ops.each &.unwait
Crystal::Scheduler.enqueue(fibers)
Crystal::Scheduler.reschedule

{index, value}
end

private def self.cancel_select_actions(ops, fibers, running_index)
ops.each_with_index do |op, i|
next if i == running_index
fiber = fibers[i]
op.unwait(fiber)
Crystal::Scheduler.enqueue(fiber) if op.waiting?
end
end

Expand All @@ -128,7 +170,7 @@ abstract class Channel(T)
end

# :nodoc:
struct ReceiveAction(C)
class ReceiveAction(C)
include SelectAction

def initialize(@channel : C)
Expand All @@ -144,15 +186,18 @@ abstract class Channel(T)

def wait
@channel.wait_for_receive
@waiting = true
Crystal::Scheduler.reschedule
end

def unwait
@channel.unwait_for_receive
def unwait(fiber)
@canceled = true
@channel.unwait_for_receive(fiber)
end
end

# :nodoc:
struct SendAction(C, T)
class SendAction(C, T)
include SelectAction

def initialize(@channel : C, @value : T)
Expand All @@ -168,10 +213,13 @@ abstract class Channel(T)

def wait
@channel.wait_for_send
@waiting = true
Crystal::Scheduler.reschedule
end

def unwait
@channel.unwait_for_send
def unwait(fiber)
@canceled = true
@channel.unwait_for_send(fiber)
end
end
end
Expand Down
17 changes: 16 additions & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,24 @@ class Crystal::Scheduler
end

protected def resume(fiber : Fiber) : Nil
validate_resumable(fiber)
current, @current = @current, fiber
GC.stack_bottom = fiber.@stack_bottom
Fiber.swapcontext(pointerof(current.@stack_top), fiber.@stack_top)
Fiber.swapcontext(pointerof(current.@context), pointerof(fiber.@context))
end

private def validate_resumable(fiber)
return if fiber.resumable?

if fiber.dead?
LibC.dprintf 2, "\nFATAL: tried to resume a dead fiber: #{fiber}\n"
else
LibC.dprintf 2, "\nFATAL: can't resume a running fiber: #{fiber}\n"
end

caller.each { |line| LibC.dprintf(2, " from #{line}\n") }

exit 1
end

protected def reschedule : Nil
Expand Down
Loading