Skip to content

Commit

Permalink
MT: rework Crystal::EventLoop and IO::Evented
Browse files Browse the repository at this point in the history
EventLoop now uses a single Event per Fiber, and declares a little
more generic API: `wait(io, :read | :write, timeout)` and
`sleep(time)`.

IO::Evented now initializes the Fiber Event for each Fiber that is
waiting for a file descriptor to be readable or writable (libevent2
allows multiple events to the same file descriptor), instead of
having a single event, which avoids an issue where concurrent fibers
could alter `#read_timeout` or `#write_timeout` and affect
previously set events.

IO::Evented is now thread-safe, and doesn't use a Deque anymore, but
Crystal::WaitDeque, a double-ended linked list of pending fibers —we
still need to keep a list of pending fibers to resume when an IO is
reopened or closed (EV_CLOSED is only available for the linux/epoll
backend).

The `sleep` methods now delegate to EventLoop, instead of Scheduler.
  • Loading branch information
ysbaddaden committed Mar 6, 2019
1 parent ab7bfb4 commit 2ea421a
Show file tree
Hide file tree
Showing 10 changed files with 232 additions and 147 deletions.
4 changes: 2 additions & 2 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ def sleep(seconds : Number)
raise ArgumentError.new "Sleep seconds must be positive"
end

Crystal::Scheduler.sleep(seconds.seconds)
Crystal::EventLoop.sleep(seconds.seconds)
end

# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span)
Crystal::Scheduler.sleep(time)
Crystal::EventLoop.sleep(time)
end

# Blocks the current fiber forever.
Expand Down
19 changes: 3 additions & 16 deletions src/concurrent/st_scheduler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,8 @@ class Crystal::Scheduler
Thread.current.scheduler.resume(fiber)
end

def self.sleep(time : Time::Span) : Nil
Thread.current.scheduler.sleep(time)
end

def self.yield : Nil
Thread.current.scheduler.yield
::sleep(0)
end

def self.yield(fiber : Fiber) : Nil
Expand Down Expand Up @@ -84,17 +80,8 @@ class Crystal::Scheduler
end
end

protected def sleep(time : Time::Span) : Nil
@current.resume_event.add(time)
reschedule
end

protected def yield : Nil
sleep(0.seconds)
end

protected def yield(fiber : Fiber) : Nil
@current.resume_event.add(0.seconds)
resume(fiber)
@runnables.unshift @current
fiber.resume
end
end
34 changes: 32 additions & 2 deletions src/crystal/event.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ require "./lib_event2"
struct Crystal::Event
VERSION = String.new(LibEvent2.event_get_version)

property? timed_out = false

def self.callback(&block : Int32, LibEvent2::EventFlags, Void* ->)
block
end
Expand All @@ -13,6 +15,12 @@ struct Crystal::Event
end

def add(timeout : LibC::Timeval? = nil)
@timed_out = false

{% if flag?(:mt) %}
@canceled.clear
{% end %}

if timeout
timeout_copy = timeout
LibEvent2.event_add(@event, pointerof(timeout_copy))
Expand All @@ -28,11 +36,29 @@ struct Crystal::Event
)
end

def del
LibEvent2.event_del(@event)
end

def free
LibEvent2.event_free(@event) unless @freed
@freed = true
end

{% if flag?(:mt) %}
@canceled = Atomic::Flag.new

def cancel(delete = true)
@canceled.test_and_set.tap do |success|
del if delete && success
end
end
{% end %}

def to_unsafe
@event
end

# :nodoc:
struct Base
def initialize
Expand All @@ -52,11 +78,15 @@ struct Crystal::Event
end
end

def new_event(s : Int32, flags : LibEvent2::EventFlags, data, &callback : LibEvent2::Callback)
event = LibEvent2.event_new(@base, s, flags, callback, data.as(Void*))
def event_new(fd : Int32, flags : LibEvent2::EventFlags, data, &callback : LibEvent2::Callback)
event = LibEvent2.event_new(@base, fd, flags, callback, data.as(Void*))
Event.new(event)
end

def event_assign(event : Event, fd : Int32, flags : LibEvent2::EventFlags, data, &callback : LibEvent2::Callback)
LibEvent2.event_assign(event, @base, fd, flags, callback, data.as(Void*))
end

def loop(flags : LibEvent2::EventLoopFlags = :none)
LibEvent2.event_base_loop(@base, flags)
end
Expand Down
60 changes: 34 additions & 26 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,42 +43,50 @@ module Crystal::EventLoop
end
{% end %}

def self.create_resume_event(fiber)
@@eb.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
def self.new_event : Event
@@eb.event_new(-1, :none, nil) { }
end

def self.wait(io : IO::Evented, what : LibEvent2::EventFlags, timeout = nil)
fiber = Fiber.current
event = fiber.event

@@eb.event_assign(event, io.fd, what, fiber) do |_, flags, data|
f = data.as(Fiber)
e = f.event
e.timed_out = true if flags.includes?(:timeout)

{% if flag?(:mt) %}
data.as(Fiber).enqueue
# only enqueue the fiber if we can cancel the event; the event may be
# canceled in parallel, and the fiber would end up being enqueued twice:
Crystal::Scheduler.enqueue(f) if e.cancel(delete: false)
{% else %}
data.as(Fiber).resume
f.resume
{% end %}
end
end
event.add(timeout)

def self.create_fd_write_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Write
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
Crystal::Scheduler.reschedule

@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Write)
io_ref.resume_write
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
io_ref.resume_write(timed_out: true)
end
end
yield IO::Timeout.new if event.timed_out?
end

def self.create_fd_read_event(io : IO::Evented, edge_triggered : Bool = false)
flags = LibEvent2::EventFlags::Read
flags |= LibEvent2::EventFlags::Persist | LibEvent2::EventFlags::ET if edge_triggered
def self.sleep(time : Time::Span)
fiber = Fiber.current
event = fiber.event

@@eb.new_event(io.fd, flags, io) do |s, flags, data|
io_ref = data.as(typeof(io))
if flags.includes?(LibEvent2::EventFlags::Read)
io_ref.resume_read
elsif flags.includes?(LibEvent2::EventFlags::Timeout)
io_ref.resume_read(timed_out: true)
end
@@eb.event_assign(event, -1, :none, fiber) do |_, _, data|
f = data.as(Fiber)

{% if flag?(:mt) %}
Crystal::Scheduler.enqueue(f) # if f.event.cancel(delete: false)
{% else %}
f.resume
{% end %}
end
event.add(time)

Crystal::Scheduler.reschedule
end

private def self.dns_base
Expand Down
1 change: 1 addition & 0 deletions src/crystal/lib_event2.cr
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ lib LibEvent2
fun event_enable_debug_mode
fun event_reinit(eb : EventBase) : Int
fun event_new(eb : EventBase, s : EvutilSocketT, events : EventFlags, callback : Callback, data : Void*) : Event
fun event_assign(event : Event, eb : EventBase, s : EvutilSocketT, events : EventFlags, callback : Callback, data : Void*) : Event
fun event_free(event : Event)
fun event_add(event : Event, timeout : LibC::Timeval*) : Int
fun event_del(event : Event) : Int
Expand Down
2 changes: 1 addition & 1 deletion src/crystal/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ module Crystal
end

private def self.break_main_loop : Nil
@@main_fiber.as(Fiber).enqueue
Crystal::Scheduler.enqueue(@@main_fiber.as(Fiber))
end
{% end %}
end
Expand Down
118 changes: 118 additions & 0 deletions src/crystal/wait_deque.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
require "./spin_lock"

# :nodoc:
#
# A double-ended queue of pending fibers implemented as a doubly-linked list.
#
# Assumes that a `Fiber` can only be in a single `WaitQueue`, `WaitDeque` or
# `Scheduler::Runnables` at a time, during which its execution will be
# suspended.
#
# The queue is fiber and thread unsafe. Accesses must be synchronized using
# another mean, for example `Thread::Mutex` or `Crystal::SpinLock`.
struct Crystal::WaitDeque
@head : Fiber?
@tail : Fiber?
@spin = Crystal::SpinLock.new

def unshift(fiber : Fiber) : Nil
@spin.synchronize do
fiber.queue_prev = nil

if head = @head
fiber.queue_next = head
@head = head.queue_prev = fiber
else
fiber.queue_next = nil
@head = @tail = fiber
end
end
end

def push(fiber : Fiber) : Nil
@spin.synchronize do
fiber.queue_next = nil

if tail = @tail
fiber.queue_prev = tail
@tail = tail.queue_next = fiber
else
fiber.queue_prev = nil
@head = @tail = fiber
end
end
end

def <<(fiber)
push fiber
end

def shift? : Nil
@spin.synchronize do
return unless fiber = @head

if new_head = fiber.queue_next
new_head.queue_prev = nil
@head = new_head
else
@head = @tail = nil
end

fiber
end
end

def pop? : Nil
@spin.synchronize do
return unless fiber = @tail

if new_tail = fiber.queue_prev
new_tail.queue_next = nil
@tail = new_tail
else
@head = @tail = nil
end

fiber
end
end

def delete(fiber : Fiber) : Nil
@spin.synchronize do
prev_fiber = fiber.queue_prev
next_fiber = fiber.queue_next

if prev_fiber
prev_fiber.queue_next = next_fiber
else
@head = next_fiber
end

if next_fiber
next_fiber.queue_prev = prev_fiber
else
@tail = prev_fiber
end
end
end

def clear : self
@spin.synchronize do
copy = dup
@head = @tail = nil
copy
end
end

def unsafe_each : Nil
fiber = @head

while fiber
# avoid issues if fiber is queued somewhere else during the iteration by
# memorizing the next fiber to iterate:
next_fiber = fiber.queue_next
yield fiber
fiber = next_fiber
end
end
end
16 changes: 11 additions & 5 deletions src/crystal/wait_queue.cr
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# :nodoc:
#
# A linked-list of pending fibers. Assumes that a fiber can only be in a
# single wait queue at a time, during which its execution is suspended.
# A FIFO queue of pending fibers implemented as a singly-linked list.
#
# The linked-list is fiber and thread unsafe. Accesses must be synchronized
# using another mean, usually `Crystal::SpinLock`.
# Assumes that a `Fiber` can only be in a single `WaitQueue`, `WaitDeque` or
# `Scheduler::Runnables` at a time, during which its execution will be
# suspended.
#
# The queue is fiber and thread unsafe. Accesses must be synchronized using
# another mean, for example `Thread::Mutex` or `Crystal::SpinLock`.
struct Crystal::WaitQueue
@head : Fiber?
@tail : Fiber?
Expand All @@ -30,8 +33,11 @@ struct Crystal::WaitQueue
fiber = @head

while fiber
# avoid issues if fiber is queued somewhere else during the block call by
# memorizing the next fiber to iterate:
next_fiber = fiber.queue_next
yield fiber
fiber = fiber.queue_next
fiber = next_fiber
end
end

Expand Down
Loading

0 comments on commit 2ea421a

Please sign in to comment.