diff --git a/spec/std/channel_spec.cr b/spec/std/channel_spec.cr index c13571aec30e..f6f7daca28dd 100644 --- a/spec/std/channel_spec.cr +++ b/spec/std/channel_spec.cr @@ -1,44 +1,21 @@ require "spec" describe Channel do - it "creates unbuffered with no arguments" do - Channel(Int32).new.should be_a(Channel::Unbuffered(Int32)) - end - - it "creates buffered with capacity argument" do - Channel(Int32).new(32).should be_a(Channel::Buffered(Int32)) - end - - it "send returns channel" do - channel = Channel(Int32).new(1) - channel.send(1).should be(channel) - end - - it "does receive_first" do - channel = Channel(Int32).new(1) - channel.send(1) - Channel.receive_first(Channel(Int32).new, channel).should eq 1 - end - - it "does send_first" do - ch1 = Channel(Int32).new(1) - ch2 = Channel(Int32).new(1) - ch1.send(1) - Channel.send_first(2, ch1, ch2) - ch2.receive.should eq 2 - end -end - -describe Channel::Unbuffered do it "pings" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new spawn { ch.send(ch.receive) } ch.send 123 ch.receive.should eq(123) end - it "blocks if there is no receiver" do - ch = Channel::Unbuffered(Int32).new + it "send returns value of which send" do + channel = Channel(Int32).new(1) + channel.send(1).should eq(1) + end + + it "blocks in sending when buffer is full" do + ch = Channel(Int32).new + state = 0 spawn do state = 1 @@ -46,48 +23,46 @@ describe Channel::Unbuffered do state = 2 end + ch.full?.should be_true Fiber.yield state.should eq(1) + ch.full?.should be_true ch.receive.should eq(123) state.should eq(1) Fiber.yield state.should eq(2) end - it "deliver many senders" do - ch = Channel::Unbuffered(Int32).new - spawn { ch.send 1; ch.send 4 } - spawn { ch.send 2; ch.send 5 } - spawn { ch.send 3; ch.send 6 } + it "blocks in receiving when buffer is empty" do + ch = Channel(Int32).new - (1..6).map { ch.receive }.sort.should eq([1, 2, 3, 4, 5, 6]) - end + state = 0 + spawn do + state = 1 + ch.receive + state = 2 + end - it "gets not full when there is a sender" do - ch = Channel::Unbuffered(Int32).new - ch.full?.should be_true ch.empty?.should be_true - spawn { ch.send 123 } Fiber.yield - ch.empty?.should be_false - ch.full?.should be_true - ch.receive.should eq(123) + state.should eq(1) + ch.send 123 + Fiber.yield + state.should eq(2) end - it "works with select" do - ch1 = Channel::Unbuffered(Int32).new - ch2 = Channel::Unbuffered(Int32).new - spawn { ch1.send 123 } - Channel.select(ch1.receive_select_action, ch2.receive_select_action).should eq({0, 123}) - end + it "deliver many senders" do + ch = Channel(Int32).new - it "works with select else" do - ch1 = Channel::Unbuffered(Int32).new - Channel.select({ch1.receive_select_action}, true).should eq({1, nil}) + spawn { ch.send 1; ch.send 4 } + spawn { ch.send 2; ch.send 5 } + spawn { ch.send 3; ch.send 6 } + + (1..6).map { ch.receive }.sort.should eq([1, 2, 3, 4, 5, 6]) end it "can send and receive nil" do - ch = Channel::Unbuffered(Nil).new + ch = Channel(Nil).new spawn { ch.send nil } Fiber.yield ch.empty?.should be_false @@ -96,124 +71,27 @@ describe Channel::Unbuffered do end it "can be closed" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new ch.closed?.should be_false ch.close.should be_nil ch.closed?.should be_true expect_raises(Channel::ClosedError) { ch.receive } end - it "can be closed after sending" do - ch = Channel::Unbuffered(Int32).new - spawn { ch.send 123; ch.close } - ch.receive.should eq(123) - expect_raises(Channel::ClosedError) { ch.receive } - end - - it "can be closed from different fiber" do - ch = Channel::Unbuffered(Int32).new - received = false - spawn { expect_raises(Channel::ClosedError) { ch.receive }; received = true } - Fiber.yield - ch.close - Fiber.yield - received.should be_true - end - it "cannot send if closed" do - ch = Channel::Unbuffered(Int32).new + ch = Channel(Int32).new ch.close expect_raises(Channel::ClosedError) { ch.send 123 } end - - it "can receive? when closed" do - ch = Channel::Unbuffered(Int32).new - ch.close - ch.receive?.should be_nil - end - - it "can receive? when not empty" do - ch = Channel::Unbuffered(Int32).new - spawn { ch.send 123 } - ch.receive?.should eq(123) - end -end - -describe Channel::Buffered do - it "pings" do - ch = Channel::Buffered(Int32).new - spawn { ch.send(ch.receive) } - ch.send 123 - ch.receive.should eq(123) - end - - it "blocks when full" do - ch = Channel::Buffered(Int32).new(2) - freed = false - spawn { 2.times { ch.receive }; freed = true } - - ch.send 1 - ch.full?.should be_false - freed.should be_false - - ch.send 2 - ch.full?.should be_true - freed.should be_false - - ch.send 3 - ch.full?.should be_false - freed.should be_true - end - - it "doesn't block when not full" do - ch = Channel::Buffered(Int32).new - done = false - spawn { ch.send 123; done = true } - done.should be_false - Fiber.yield - done.should be_true - end - - it "gets ready with data" do - ch = Channel::Buffered(Int32).new - ch.empty?.should be_true - ch.send 123 - ch.empty?.should be_false - end - - it "works with select" do - ch1 = Channel::Buffered(Int32).new - ch2 = Channel::Buffered(Int32).new - spawn { ch1.send 123 } - Channel.select(ch1.receive_select_action, ch2.receive_select_action).should eq({0, 123}) - end - - it "can send and receive nil" do - ch = Channel::Buffered(Nil).new - spawn { ch.send nil } - Fiber.yield - ch.empty?.should be_false - ch.receive.should be_nil - ch.empty?.should be_true - end - - it "can be closed" do - ch = Channel::Buffered(Int32).new - ch.closed?.should be_false - ch.close - ch.closed?.should be_true - expect_raises(Channel::ClosedError) { ch.receive } - end - it "can be closed after sending" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new spawn { ch.send 123; ch.close } ch.receive.should eq(123) expect_raises(Channel::ClosedError) { ch.receive } end it "can be closed from different fiber" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new received = false spawn { expect_raises(Channel::ClosedError) { ch.receive }; received = true } Fiber.yield @@ -222,41 +100,65 @@ describe Channel::Buffered do received.should be_true end - it "cannot send if closed" do - ch = Channel::Buffered(Int32).new + it "can send? when closed" do + ch = Channel(Int32).new ch.close - expect_raises(Channel::ClosedError) { ch.send 123 } + ch.send?(123).should be_nil end it "can receive? when closed" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new ch.close ch.receive?.should be_nil end it "can receive? when not empty" do - ch = Channel::Buffered(Int32).new + ch = Channel(Int32).new 1 spawn { ch.send 123 } ch.receive?.should eq(123) end - it "does inspect on unbuffered channel" do - ch = Channel::Unbuffered(Int32).new - ch.inspect.should eq("#") + it "does receive_first" do + channel = Channel(Int32).new(1) + channel.send(1) + Channel.receive_first(Channel(Int32).new, channel).should eq 1 + end + + it "does send_first" do + ch1 = Channel(Int32).new(1) + ch2 = Channel(Int32).new(1) + ch1.send(1) + Channel.send_first(2, ch1, ch2) + ch2.receive.should eq 2 end - it "does inspect on buffered channel" do - ch = Channel::Buffered(Int32).new(10) - ch.inspect.should eq("#") + it "works with select" do + ch1 = Channel(Int32).new 1 + ch2 = Channel(Int32).new 1 + ch1.send(123) + state = 0 + Channel.select do |x| + x.receive_action ch1 do |val| + val.should eq(123) + state = 1 + end + + x.receive_action ch2 do |val| + state = 2 + end + end + state.should eq 1 end - it "does pretty_inspect on unbuffered channel" do - ch = Channel::Unbuffered(Int32).new - ch.pretty_inspect.should eq("#") + it "work with select" do + ch1 = Channel(Int32).new + ch2 = Channel(Int32).new + spawn { ch1.send 123 } + Channel.select(ch1.receive_select_action, ch2.receive_select_action).should eq({0, 123}) end - it "does pretty_inspect on buffered channel" do - ch = Channel::Buffered(Int32).new(10) - ch.pretty_inspect.should eq("#") + it "works with select else" do + ch1 = Channel(Int32).new + Channel.select({ch1.receive_select_action}, true).should eq({1, nil}) end end diff --git a/src/concurrent/channel.cr b/src/concurrent/channel.cr index 2070961279f4..cb4c74444cd4 100644 --- a/src/concurrent/channel.cr +++ b/src/concurrent/channel.cr @@ -1,272 +1,690 @@ require "fiber" -abstract class Channel(T) - module SelectAction - abstract def ready? - abstract def execute - abstract def wait - abstract def unwait - end - +class Channel(T) + # Raised when send value to closed channel or receive value from empty closed channel class ClosedError < Exception def initialize(msg = "Channel is closed") super(msg) end end - def initialize + # Creates a new empty Channel with `capacity` + # + # The `capacity` is the size of buffer. If `capacity` is zero, then the channel is unbuffered; + # Otherwise it is buffered channel. + # + # ``` + # unbuffered_ch = Channel(Int32).new + # buffered_ch = Channel(Int32).new 5 + # ``` + def initialize(@capacity = 0) + # queue store availiable data if status > 0, otherwise it + # store incomplete task need to be set data + @queue = Deque(SimpleIVar(T) | SelectIVar(T)).new + + # the number of difference between send and wait, and it will not exceed @capacity + @status = 0 + + # store send to send_wait if queue is full + @send_wait = Deque(Tuple(T, SimpleIVar(T) | SelectIVar(T))).new + + # true if channel closed @closed = false - @senders = Deque(Fiber).new - @receivers = Deque(Fiber).new + + # for synchronize + @mutex = Thread::Mutex.new + + # priority for execute select + @priority = 0_u64 + + # for priority synchronize, spin lock would be better + @priority_lock = Thread::Mutex.new + + # the count of the thread hold priority + @priority_ref_count = 0 end - def self.new : Unbuffered(T) - Unbuffered(T).new + # Send value into channel. It returns value which send when send is complete. + # Raise `ClosedError` if closed. + # + # ``` + # channel = Channel(Int32).new 1 + # channel.send 2 # => channel + # ``` + def send(value : T) + send_impl(value) { raise ClosedError.new }.get end - def self.new(capacity) : Buffered(T) - Buffered(T).new(capacity) + # Send value into channel. It returns value which send when send is complete. + # Returns `nil` if closed. + def send?(value : T) + send_impl(value) { return nil }.get? end - def close - @closed = true - Scheduler.enqueue @receivers - @receivers.clear - nil + private def send_impl(value : T) + @mutex.synchronize do + yield if @closed + loop do + if @status >= @capacity + ivar = SimpleIVar(T).new + tuple = {value, ivar} + @send_wait.push tuple + return ivar + elsif @status >= 0 + ivar = SimpleIVar(T).new + ivar.value = value + @queue.push ivar + @status += 1 + return ivar + else + ivar = @queue.shift + @status += 1 + + # make sure receive ivar is incomplete + next unless ivar.try_set_value? value + return ivar + end + end + end end - def closed? - @closed + protected def send?(value : T, wait_ivar) : Nil + send_impl(value, wait_ivar) do + wait_ivar.try_set_error? ClosedError.new + break nil + end + end + + protected def send_impl(value : T, wait_ivar) : Nil + @mutex.synchronize do + yield if @closed + loop do + if @status >= @capacity + tuple = {value, wait_ivar} + @send_wait.push tuple + elsif @status >= 0 + # make sure send ivar is incomplete + if wait_ivar.try_set_value? value + @queue.push wait_ivar + @status += 1 + end + else + # make sure recieve ivar is complete + if (ivar = @queue.first).try_complete? + # make sure send ivar is complete + if wait_ivar.try_set_value? value + ivar.complete_set_value value + @queue.shift + @status += 1 + else + ivar.reset + end + else + @queue.shift + @status += 1 + next + end + end + + break + end + end end + # receive value from channel. It returns value when receive is complete. + # Raise `ClosedError` if closed. + # + # ``` + # ch = Channel(Int32).new + # ch.send 1 + # ch.receive # => 1 + # ``` def receive - receive_impl { raise ClosedError.new } + receive_impl { raise ClosedError.new }.get end + # Recieve value from channel. It returns value when receive is complete. + # Returns `nil` if closed. def receive? - receive_impl { return nil } + receive_impl { return nil }.get? + end + + protected def receive_impl + @mutex.synchronize do + loop do + if tuple = @send_wait.shift? + # make sure send ivar is incomplete + next unless tuple[1].try_set_value? tuple[0] + @queue.push tuple[1] + return @queue.shift + elsif @status > 0 + @status -= 1 + return @queue.shift + else + yield if @closed + ivar = SimpleIVar(T).new + @queue.push ivar + @status -= 1 + return ivar + end + end + end end - def inspect(io) - to_s(io) + protected def receive?(ivar) + receive_impl(ivar) do + ivar.try_set_error? ClosedError.new + break nil + end end - def pretty_print(pp) - pp.text inspect + protected def receive_impl(ivar) : Nil + @mutex.synchronize do + loop do + if tuple = @send_wait.first? + # make sure recieve ivar is incomplete + if ivar.try_complete? + @send_wait.shift + + # make sure send ivar is incomplete + if tuple[1].try_set_value? tuple[0] + @queue.push tuple[1] + livar = @queue.shift + ivar.complete_set_value livar.get + else + ivar.reset + next + end + end + elsif @status > 0 + livar = @queue.first + + # make sure recieve ivar is not completed + if ivar.try_set_value? livar.get + @queue.shift + @status -= 1 + end + else + yield if @closed + @queue.push ivar + @status -= 1 + end + + break + end + end end - def wait_for_receive - @receivers << Fiber.current + def send_select_action(value : T) + SendAction.new self, value end - def unwait_for_receive - @receivers.delete Fiber.current + def receive_select_action + ReceiveAction.new self + end + + # Close channel. It is able to receive value if there are remaining send values. + # + # ``` + # ch = Channel(Int32).new + # ch.close + # ``` + def close : Nil + return if @closed + @mutex.synchronize do + @closed = true + if @status < 0 + @queue.each &.try_set_error? ClosedError.new + @queue.clear + @status = 0 + end + end end - def wait_for_send - @senders << Fiber.current + # Returns `true` if channel closed, otherwise `false`. + # + # ``` + # ch = Channel(Int32).new + # ch.closed? # => false + # ch.close + # ch.closed? # => true + # ``` + def closed? + @closed end - def unwait_for_send - @senders.delete Fiber.current + # Returns `true` if the buffer of channel is full, otherwise `false`. + # + # ``` + # ch = Channel(Int32).new 1 + # ch.full? # => false + # ch.send 1 + # ch.full? # => true + # ``` + def full? + @status >= @capacity end - protected def raise_if_closed - raise ClosedError.new if @closed + # Returns `true` if the buffer of channel is empty, otherwise `false`. + # + # ``` + # ch = Channel(Int32).new 1 + # ch.empty? # => true + # ch.send 1 + # ch.empty? # => false + # ``` + def empty? + if @capacity > 0 + @status <= 0 + else + @send_wait.empty? + end end - def self.receive_first(*channels) - receive_first channels + protected def acquire_priority + @priority_lock.synchronize do + @priority = Random.rand(UInt64::MAX) if @priority_ref_count == 0 + @priority_ref_count += 1 + return @priority + end end - def self.receive_first(channels : Tuple | Array) - self.select(channels.map(&.receive_select_action))[1] + protected def release_priority + @priority_lock.synchronize do + @priority_ref_count -= 1 + end end - def self.send_first(value, *channels) - send_first value, channels + # Send first value into given channels. + # + # ``` + # ch1 = Channel(Int32).new 1 + # ch2 = Channel(Int32).new 1 + # ch1.send 1 + # Channel.send_first(2, ch1, ch2) + # ch2.receive # => 2 + # ``` + def self.send_first(value : T, *channels : Channel(T)) forall T + wait_ivar = SimpleIVar(T).new + channels.each &.send?(value, wait_ivar) + wait_ivar.get + nil end - def self.send_first(value, channels : Tuple | Array) - self.select(channels.map(&.send_select_action(value))) + # Send first value into given channels. + def self.send_first(value : T, channels : Array(Channel(T))) forall T + wait_ivar = SimpleIVar(T).new + channels.each &.send?(value, wait_ivar) + wait_ivar.get nil end - def self.select(*ops : SelectAction) - self.select ops + # Receive first value from given channels. + # + # ``` + # ch1 = Channel(Int32).new 1 + # ch2 = Channel(Int32).new 1 + # ch1.send 1 + # Channel.receive_first(ch1, ch2) # => 1 + # ``` + def self.receive_first(*channels : Channel(T)) forall T + ivar = SimpleIVar(T).new + channels.each &.receive?(ivar) + ivar.get + end + + # Receive first value from given channels. + def self.receive_first(channels : Array(Channel(T))) forall T + ivar = SimpleIVar(T).new + channels.each &.receive?(ivar) + ivar.get + end + + # Select one of action to execute. + # + # ``` + # ch1 = Channel(Int32).new 1 + # ch2 = Channel(Int32).new 1 + # ch1.send 123 + # status = 0 + # Channel.select do |x| + # x.receive_action ch1 do |val| + # val # => 123 + # status = 1 + # end + # + # x.receive_action ch2 do |val| + # status = 2 + # end + # end + # status # => 1 + # ``` + def self.select + yield selector = Selector.new + selector.run + end + + def self.select(ops : Tuple, has_else = false) + self.select *ops, has_else: has_else + end + + def self.select(*ops : *T, has_else = false) forall T + idx, ret = -1, nil + self.select do |x| + {% for i in 0...T.size %} + ops[{{i}}].execute x do |val| + idx, ret = {{i}}, val + end + {% end %} + + if has_else + x.default_action do + idx, ret = ops.size, nil + end + end + end + return idx, ret end - def self.select(ops : Tuple | Array, has_else = false) - loop do + def self.select(ops : Array, has_else = false) + idx, ret = -1, nil + self.select do |x| ops.each_with_index do |op, index| - if op.ready? - result = op.execute - return index, result + op.execute x do |val| + idx, ret = index, val end end if has_else - return ops.size, nil + x.default_action do + idx, ret = ops.size, nil + end end - - ops.each &.wait - Scheduler.reschedule - ops.each &.unwait end + return idx, ret end - def send_select_action(value : T) - SendAction.new(self, value) - end - - def receive_select_action - ReceiveAction.new(self) - end + private class Selector + @ivar = SimpleIVar(->).new + @actions = [] of Tuple({UInt64, UInt64}, Proc(Nil)) + @cleanups = [] of Proc(Nil) - struct ReceiveAction(C) - include SelectAction + def send_action(ch : Channel(T), value : T, &block : T ->) forall T + tuple = Tuple.new Tuple.new(ch.acquire_priority, ch.object_id), ->do + svar = SelectIVar(T).new @ivar, &block + ch.send?(value, svar) + nil + end - def initialize(@channel : C) + @actions << tuple + @cleanups << ->do + ch.release_priority + nil + end end - def ready? - !@channel.empty? - end + def receive_action(ch : Channel(T), &block : T ->) forall T + tuple = Tuple.new Tuple.new(ch.acquire_priority, ch.object_id), ->do + svar = SelectIVar(T).new @ivar, &block + ch.receive?(svar) + nil + end - def execute - @channel.receive + @actions << tuple + @cleanups << ->do + ch.release_priority + nil + end end - def wait - @channel.wait_for_receive + def default_action(&@default : ->) end - def unwait - @channel.unwait_for_receive + protected def run + @actions.group_by { |id, proc| id } + .map { |id, arr| arr[Random.rand(arr.size)] } + .sort_by! { |id, proc| id } + .each { |id, proc| proc.call } + @cleanups.each &.call + + @default.try { |x| @ivar.try_set_value? x } + @ivar.get.call end end - struct SendAction(C, T) - include SelectAction + module SelectAction(T) + abstract def execute(selector, &block : T ->) + end - def initialize(@channel : C, @value : T) - end + struct SendAction(T) + include SelectAction(T) - def ready? - !@channel.full? + def initialize(@ch : Channel(T), @value : T) end - def execute - @channel.send(@value) + def execute(selector, &block : T ->) + selector.send_action @ch, @value, &block end + end - def wait - @channel.wait_for_send + struct ReceiveAction(T) + include SelectAction(T) + + def initialize(@ch : Channel(T)) end - def unwait - @channel.unwait_for_send + def execute(selector, &block : T ->) + selector.receive_action @ch, &block end end end -class Channel::Buffered(T) < Channel(T) - def initialize(@capacity = 32) - @queue = Deque(T).new(@capacity) - super() +private module Status + INCOMPLETE = 0 + MAYCOMPLETE = 1 + COMPLETED = 2 + FAULTED = 3 +end + +private class SimpleIVar(T) + @value : T? + @error : Exception? + @status = Atomic(Int32).new Status::INCOMPLETE + @cur_fiber : Fiber? + + def get + get_impl { raise @error.not_nil! } end - def send(value : T) - while full? - raise_if_closed - @senders << Fiber.current - Scheduler.reschedule + def get? + get_impl { break nil } + end + + private def get_impl + wait + case @status.get + when Status::COMPLETED + return @value.as(T) + when Status::FAULTED + yield + else + raise "compiler bug" end + end - raise_if_closed + def value=(value : T) : Nil + unless try_set_value? value + raise "Invalid Operation!" + end + end - @queue << value - Scheduler.enqueue @receivers - @receivers.clear + def try_set_value?(value : T) + if try_complete? + complete_set_value value + true + else + false + end + end - self + def error=(error : Exception) : Nil + unless try_set_error? error + raise "Invalid Operation!" + end end - private def receive_impl - while empty? - yield if @closed - @receivers << Fiber.current - Scheduler.reschedule + def try_set_error?(error : Exception) + if try_complete? + complete_set_error error + true + else + false end + end - @queue.shift.tap do - Scheduler.enqueue @senders - @senders.clear + def try_complete? + loop do + if (tuple = @status.compare_and_set(Status::INCOMPLETE, Status::MAYCOMPLETE))[1] + return true + elsif tuple[0] != Status::MAYCOMPLETE + return false + end end end - def full? - @queue.size >= @capacity + def reset : Nil + unless @status.compare_and_set(Status::MAYCOMPLETE, Status::INCOMPLETE)[1] + raise "Invalid Status!" + end end - def empty? - @queue.empty? + def complete_set_value(value : T) : Nil + @value = value + @cur_fiber.try { |x| Scheduler.enqueue x } + unless @status.compare_and_set(Status::MAYCOMPLETE, Status::COMPLETED)[1] + raise "Invalid Status!" + end end -end -class Channel::Unbuffered(T) < Channel(T) - @sender : Fiber? + def complete_set_error(error : Exception) : Nil + @error = error + @cur_fiber.try { |x| Scheduler.enqueue x } + unless @status.compare_and_set(Status::MAYCOMPLETE, Status::FAULTED)[1] + raise "Invalid Status!" + end + end - def initialize - @has_value = false - @value = uninitialized T - super + def wait + wait_impl do + if try_complete? + @cur_fiber = Fiber.current + reset + Scheduler.reschedule + end + end end - def send(value : T) - while @has_value - raise_if_closed - @senders << Fiber.current - Scheduler.reschedule + def wait_impl + while @status.get < Status::COMPLETED + yield end + end +end - raise_if_closed +private class SelectIVar(T) + @value : T? + @error : Exception? + @status = Atomic(Int32).new Status::INCOMPLETE + @proc : -> + @cur_fiber : Fiber? - @value = value - @has_value = true - @sender = Fiber.current + def initialize(@ivar : SimpleIVar(->), &block : T ->) + @proc = ->{ block.call(get) } + end + + def get + get_impl { raise @error.not_nil! } + end - if receiver = @receivers.shift? - receiver.resume + def get? + get_impl { break nil } + end + + private def get_impl + wait + case @status.get + when Status::COMPLETED + return @value.as(T) + when Status::FAULTED + yield else - Scheduler.reschedule + raise "compiler bug" end end - private def receive_impl - until @has_value - yield if @closed - @receivers << Fiber.current - if sender = @senders.shift? - sender.resume - else - Scheduler.reschedule - end + def value=(value : T) : Nil + unless try_set_value? value + raise "Invalid Operation!" end + end - yield if @closed + def try_set_value?(value : T) + if @ivar.try_set_value? @proc + @value = value + @cur_fiber.try { |x| Scheduler.enqueue x } + @status.lazy_set(Status::COMPLETED) + true + else + false + end + end - @value.tap do - @has_value = false - Scheduler.enqueue @sender.not_nil! + def error=(error : Exception) : Nil + unless try_set_error? error + raise "Invalid Operation!" end end - def empty? - !@has_value + def try_set_error?(error : Exception) + if @ivar.try_set_value? @proc + @error = error + @cur_fiber.try { |x| Scheduler.enqueue x } + @status.lazy_set(Status::FAULTED) + true + else + false + end end - def full? - @has_value || @receivers.empty? + delegate try_complete?, reset, to: @ivar + + def complete_set_value(value : T) : Nil + @ivar.complete_set_value @proc + @value = value + @cur_fiber.try { |x| Scheduler.enqueue x } + @status.lazy_set(Status::COMPLETED) + end + + def complete_set_error(error : Exception) : Nil + @ivar.complete_set_value @proc + @error = error + @cur_fiber.try { |x| Scheduler.enqueue x } + @status.lazy_set(Status::FAULTED) + end + + def wait + wait_impl do + if try_complete? + @cur_fiber = Fiber.current + reset + Scheduler.reschedule + end + end + end + + def wait_impl + while @status.get < Status::COMPLETED + yield + end end end diff --git a/src/event/signal_child_handler.cr b/src/event/signal_child_handler.cr index 16b4eaa17234..b5d6459a14d7 100644 --- a/src/event/signal_child_handler.cr +++ b/src/event/signal_child_handler.cr @@ -11,7 +11,7 @@ class Event::SignalChildHandler end end - alias ChanType = Channel::Buffered(Process::Status?) + alias ChanType = Channel(Process::Status?) def initialize @pending = Hash(LibC::PidT, Process::Status).new