From f402c58a17c331e9fe5c462424d2ab62a238a8d8 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 19 Oct 2018 17:56:36 +0200 Subject: [PATCH 1/4] Introduce Fiber::Context struct The fiber context holds the current stack top pointer (for a saved stack), as well as a resumable flag that tells whether the fiber can be resumed (its context was fully saved) or is currently running, or is dead (its proc returned). The resumable flag is required to prevent a fiber to be resumed while it's still running. This should usually not happen with the monothreaded scheduler, but still useful to find a buggy synchronisation primitive that would enqueue the same fiber multiple times (e.g. the current `select` implementation). This will be very frequent with a multithreaded scheduler where a thread B could try to resume a fiber that has just been enqueued by thread A but didn't fully store its context yet. The fiber status, running or dead, is monitored using an `@alive` boolean. We can't set `context.resumable` to another value, because the scheduler eventually swaps the context to another fiber, which would overwrite `context.resumable`. The scheduler now aborts with a fatal error whenever it would resume a running or dead fiber, which would otherwise lead to a segfault. It doesn't just ignore the fiber since to enqueue a fiber twice or a dead fiber is an error that must be fixed. --- src/crystal/scheduler.cr | 17 +++++++- src/fiber.cr | 70 +++++++++++++++----------------- src/fiber/aarch64.cr | 62 ---------------------------- src/fiber/arm.cr | 58 -------------------------- src/fiber/context.cr | 55 +++++++++++++++++++++++++ src/fiber/context/aarch64.cr | 79 ++++++++++++++++++++++++++++++++++++ src/fiber/context/arm.cr | 72 ++++++++++++++++++++++++++++++++ src/fiber/context/i686.cr | 37 +++++++++++++++++ src/fiber/context/x86_64.cr | 41 +++++++++++++++++++ src/fiber/i686.cr | 33 --------------- src/fiber/x86_64.cr | 37 ----------------- 11 files changed, 333 insertions(+), 228 deletions(-) delete mode 100644 src/fiber/aarch64.cr delete mode 100644 src/fiber/arm.cr create mode 100644 src/fiber/context.cr create mode 100644 src/fiber/context/aarch64.cr create mode 100644 src/fiber/context/arm.cr create mode 100644 src/fiber/context/i686.cr create mode 100644 src/fiber/context/x86_64.cr delete mode 100644 src/fiber/i686.cr delete mode 100644 src/fiber/x86_64.cr diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index 57345d948193..eab789d1310a 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -57,9 +57,24 @@ class Crystal::Scheduler end protected def resume(fiber : Fiber) : Nil + validate_resumable(fiber) current, @current = @current, fiber GC.stack_bottom = fiber.@stack_bottom - Fiber.swapcontext(pointerof(current.@stack_top), fiber.@stack_top) + Fiber.swapcontext(pointerof(current.@context), pointerof(fiber.@context)) + end + + private def validate_resumable(fiber) + return if fiber.resumable? + + if fiber.dead? + LibC.dprintf 2, "\nFATAL: tried to resume a dead fiber: #{fiber}\n" + else + LibC.dprintf 2, "\nFATAL: can't resume a running fiber: #{fiber}\n" + end + + caller.each { |line| LibC.dprintf(2, " from #{line}\n") } + + exit 1 end protected def reschedule : Nil diff --git a/src/fiber.cr b/src/fiber.cr index bd37dc955c85..2152c7dffb34 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,38 +1,6 @@ require "c/sys/mman" require "thread/linked_list" - -# Load the arch-specific methods to create a context and to swap from one -# context to another one. There are two methods: `Fiber#makecontext` and -# `Fiber.swapcontext`. -# -# - `Fiber.swapcontext(current_stack_ptr : Void**, dest_stack_ptr : Void*) -# -# A fiber context switch in Crystal is achieved by calling a symbol (which -# must never be inlined) that will push the callee-saved registers (sometimes -# FPU registers and others) on the stack, saving the current stack pointer at -# location pointed by `current_stack_ptr` (the current fiber is now paused) -# then loading the `dest_stack_ptr` pointer into the stack pointer register -# and popping previously saved registers from the stack. Upon return from the -# symbol the new fiber is resumed since we returned/jumped to the calling -# symbol. -# -# Details are arch-specific. For example: -# - which registers must be saved, the callee-saved are sometimes enough (X86) -# but some archs need to save the FPU register too (ARMHF); -# - a simple return may be enough (X86), but sometimes an explicit jump is -# required to not confuse the stack unwinder (ARM); -# - and more. -# -# For the initial resume, the register holding the first parameter must be set -# (see makecontext below) and thus must also be saved/restored. -# -# - `Fiber#makecontext(stack_ptr : Void*, fiber_main : Fiber ->)` -# -# `makecontext` is responsible to reserve and initialize space on the stack -# for the initial context and save the initial `@stack_top` pointer. The first -# time a fiber is resumed, the `fiber_main` proc must be called, passing -# `self` as its first argument. -require "./fiber/*" +require "./fiber/context" # :nodoc: @[NoInline] @@ -42,17 +10,25 @@ fun _fiber_get_stack_top : Void* end class Fiber + class ResumeError < Exception + getter fiber : Fiber + + def initialize(message : String, @fiber : Fiber) + super message + end + end + STACK_SIZE = 8 * 1024 * 1024 @@fibers = Thread::LinkedList(Fiber).new @@stack_pool = [] of Void* + @context : Context @stack : Void* @resume_event : Crystal::Event? - @stack_top = Pointer(Void).null - protected property stack_top : Void* protected property stack_bottom : Void* property name : String? + @alive = true # :nodoc: property next : Fiber? @@ -66,6 +42,7 @@ class Fiber end def initialize(@name : String? = nil, &@proc : ->) + @context = Context.new @stack = Fiber.allocate_stack @stack_bottom = @stack + STACK_SIZE @@ -86,7 +63,7 @@ class Fiber # :nodoc: def initialize(@stack : Void*) @proc = Proc(Void).new { } - @stack_top = _fiber_get_stack_top + @context = Context.new(_fiber_get_stack_top) @stack_bottom = GC.stack_bottom @name = "main" @@ -144,6 +121,7 @@ class Fiber # Delete the resume event if it was used by `yield` or `sleep` @resume_event.try &.free + @alive = false Crystal::Scheduler.reschedule end @@ -151,6 +129,24 @@ class Fiber Crystal::Scheduler.current_fiber end + # The fiber's proc is currently running or didn't fully save its context. The + # fiber can't be resumed. + def running? + @context.resumable == 0 + end + + # The fiber's proc is currently not running and fully save its context. The + # fiber can be resumed safely. + def resumable? + @context.resumable == 1 + end + + # The fiber's proc has terminated, and the fiber is now considered dead. The + # fiber is impossible to resume, ever. + def dead? + @alive == false + end + def resume : Nil Crystal::Scheduler.resume(self) end @@ -179,7 +175,7 @@ class Fiber protected def push_gc_roots # Push the used section of the stack - GC.push_stack @stack_top, @stack_bottom + GC.push_stack @context.stack_top, @stack_bottom end # pushes the stack of pending fibers when the GC wants to collect memory: diff --git a/src/fiber/aarch64.cr b/src/fiber/aarch64.cr deleted file mode 100644 index 63b6d40ab89e..000000000000 --- a/src/fiber/aarch64.cr +++ /dev/null @@ -1,62 +0,0 @@ -{% skip_file unless flag?(:aarch64) %} - -class Fiber - # :nodoc: - def makecontext(stack_ptr, fiber_main) : Void* - # in ARMv8, the context switch push/pops 12 registers + 8 FPU registers. - # add one more to store the argument of `fiber_main` (+ alignment) - @stack_top = (stack_ptr - 22).as(Void*) - - stack_ptr[-2] = self.as(Void*) # this will be `pop` into r0 (first argument) - stack_ptr[-14] = fiber_main.pointer # initial `resume` will `ret` to this address - end - - # :nodoc: - @[NoInline] - @[Naked] - def self.swapcontext(current, to) : Nil - # adapted from https://github.com/ldc-developers/druntime/blob/ldc/src/core/threadasm.S - # - # preserve/restore AAPCS64 registers - # x19-x28 5.1.1 64-bit callee saved - # x29 fp, or possibly callee saved reg - depends on platform choice 5.2.3) - # x30 lr - # x0 self argument (initial call) - # d8-d15 5.1.2 says callee only must save bottom 64-bits (the "d" regs) - asm(" - stp d15, d14, [sp, #-22*8]! - stp d13, d12, [sp, #2*8] - stp d11, d10, [sp, #4*8] - stp d9, d8, [sp, #6*8] - stp x30, x29, [sp, #8*8] // lr, fp - stp x28, x27, [sp, #10*8] - stp x26, x25, [sp, #12*8] - stp x24, x23, [sp, #14*8] - stp x22, x21, [sp, #16*8] - stp x20, x19, [sp, #18*8] - stp x0, x1, [sp, #20*8] // self, alignment - - mov x19, sp - str x19, [$0] - mov sp, $1 - - ldp x0, x1, [sp, #20*8] // self, alignment - ldp x20, x19, [sp, #18*8] - ldp x22, x21, [sp, #16*8] - ldp x24, x23, [sp, #14*8] - ldp x26, x25, [sp, #12*8] - ldp x28, x27, [sp, #10*8] - ldp x30, x29, [sp, #8*8] // lr, fp - ldp d9, d8, [sp, #6*8] - ldp d11, d10, [sp, #4*8] - ldp d13, d12, [sp, #2*8] - ldp d15, d14, [sp], #22*8 - - // avoid a stack corruption that will confuse the unwinder - mov x16, x30 // save lr - mov x30, #0 // reset lr - br x16 // jump to new pc value - " - :: "r"(current), "r"(to)) - end -end diff --git a/src/fiber/arm.cr b/src/fiber/arm.cr deleted file mode 100644 index ce68a717b01c..000000000000 --- a/src/fiber/arm.cr +++ /dev/null @@ -1,58 +0,0 @@ -{% skip_file unless flag?(:arm) %} - -class Fiber - # :nodoc: - def makecontext(stack_ptr, fiber_main) : Void* - # in ARMv6 / ARVMv7, the context switch push/pops 8 registers. - # add one more to store the argument of `fiber_main`: - {% if flag?(:armhf) %} - # add 8 FPU registers (64-bit). - @stack_top = (stack_ptr - (9 + 16)).as(Void*) - {% else %} - @stack_top = (stack_ptr - 9).as(Void*) - {% end %} - - stack_ptr[0] = fiber_main.pointer # initial `resume` will `ret` to this address - stack_ptr[-9] = self.as(Void*) # this will be `pop` into r0 (first argument) - end - - # :nodoc: - @[NoInline] - @[Naked] - def self.swapcontext(current, to) : Nil - # eventually reset LR to zero to avoid the ARM unwinder to mistake the - # context switch as a regular call. - - {% if flag?(:armhf) %} - asm(" - .fpu vfp - - stmdb sp!, {r0, r4-r11, lr} - vstmdb sp!, {d8-d15} - str sp, [$0] - - mov sp, $1 - vldmia sp!, {d8-d15} - ldmia sp!, {r0, r4-r11, lr} - - mov r1, lr - mov lr, #0 - mov pc, r1 - " - :: "r"(current), "r"(to)) - {% elsif flag?(:arm) %} - asm(" - stmdb sp!, {r0, r4-r11, lr} - str sp, [$0] - - mov sp, $1 - ldmia sp!, {r0, r4-r11, lr} - - mov r1, lr - mov lr, #0 - mov pc, r1 - " - :: "r"(current), "r"(to)) - {% end %} - end -end diff --git a/src/fiber/context.cr b/src/fiber/context.cr new file mode 100644 index 000000000000..a31734a076dd --- /dev/null +++ b/src/fiber/context.cr @@ -0,0 +1,55 @@ +class Fiber + # :nodoc: + # + # The arch-specific make/swapcontext assembly relies on the Context struct and + # expects the following layout. Avoid moving the struct properties as it would + # require to update all the make/swapcontext implementations. + @[Extern] + struct Context + property stack_top : Void* + property resumable : LibC::Long + + def initialize(@stack_top = Pointer(Void).null) + @resumable = 0 + end + end + + # :nodoc: + # + # A fiber context switch in Crystal is achieved by calling a symbol (which + # must never be inlined) that will push the callee-saved registers (sometimes + # FPU registers and others) on the stack, saving the current stack pointer at + # location pointed by `current_stack_ptr` (the current fiber is now paused) + # then loading the `dest_stack_ptr` pointer into the stack pointer register + # and popping previously saved registers from the stack. Upon return from the + # symbol the new fiber is resumed since we returned/jumped to the calling + # symbol. + # + # Details are arch-specific. For example: + # - which registers must be saved, the callee-saved are sometimes enough (X86) + # but some archs need to save the FPU register too (ARMHF); + # - a simple return may be enough (X86), but sometimes an explicit jump is + # required to not confuse the stack unwinder (ARM); + # - and more. + # + # For the initial resume, the register holding the first parameter must be set + # (see makecontext below) and thus must also be saved/restored when swapping + # the context. + # + # def self.swapcontext(current_context : Context*, old_context : Context*) : Nil + # end + + # :nodoc: + # + # Initializes `@context`, reserves and initializes space on the stack for the + # initial context that must call the *fiber_main* proc, passing `self` as its + # first argument. + # + # def makecontext(stack_ptr : Void*, fiber_main : Fiber ->) : Nil + # end +end + +# Load the arch-specific methods to create a context and to swap from one +# context to another one. There are two methods: `Fiber#makecontext` and +# `Fiber.swapcontext`. +require "./context/*" diff --git a/src/fiber/context/aarch64.cr b/src/fiber/context/aarch64.cr new file mode 100644 index 000000000000..abe2af63cd22 --- /dev/null +++ b/src/fiber/context/aarch64.cr @@ -0,0 +1,79 @@ +{% skip_file unless flag?(:aarch64) %} + +class Fiber + # :nodoc: + def makecontext(stack_ptr, fiber_main) : Nil + # in ARMv8, the context switch push/pop 12 registers and 8 FPU registers, + # and one more to store the argument of `fiber_main` (+ alignment), we thus + # reserve space for 22 pointers: + @context.stack_top = (stack_ptr - 22).as(Void*) + @context.resumable = 1 + + stack_ptr[-2] = self.as(Void*) # x0 (r0): puts `self` as first argument for `fiber_main` + stack_ptr[-14] = fiber_main.pointer # x30 (lr): initial `resume` will `ret` to this address + end + + # :nodoc: + @[NoInline] + @[Naked] + def self.swapcontext(current_context, new_context) : Nil + # adapted from https://github.com/ldc-developers/druntime/blob/ldc/src/core/threadasm.S + # + # preserve/restore AAPCS64 registers: + # x19-x28 5.1.1 64-bit callee saved + # x29 fp, or possibly callee saved reg - depends on platform choice 5.2.3) + # x30 lr + # x0 self argument (initial call) + # d8-d15 5.1.2 says callee only must save bottom 64-bits (the "d" regs) + # + # ARM assembly requires integer literals to be moved to a register before + # being stored at an address; we use x19 as a scratch register that will be + # overwritten by the new context. + # + # AArch64 assembly also requires a register to load/store the stack top + # pointer. We use x19 as a scratch register again. + # + # Eventually reset LR to zero to avoid the ARM unwinder to mistake the + # context switch as a regular call. + asm(" + stp d15, d14, [sp, #-22*8]! + stp d13, d12, [sp, #2*8] + stp d11, d10, [sp, #4*8] + stp d9, d8, [sp, #6*8] + stp x30, x29, [sp, #8*8] // lr, fp + stp x28, x27, [sp, #10*8] + stp x26, x25, [sp, #12*8] + stp x24, x23, [sp, #14*8] + stp x22, x21, [sp, #16*8] + stp x20, x19, [sp, #18*8] + stp x0, x1, [sp, #20*8] // push 1st argument (+ alignment) + + mov x19, sp // current_context.stack_top = sp + str x19, [$0, #0] + mov x19, #1 // current_context.resumable = 1 + str x19, [$0, #8] + + mov x19, #0 // new_context.resumable = 0 + str x19, [$1, #8] + ldr x19, [$1, #0] // sp = new_context.stack_top (x19) + mov sp, x19 + + ldp x0, x1, [sp, #20*8] // pop 1st argument (+ alignement) + ldp x20, x19, [sp, #18*8] + ldp x22, x21, [sp, #16*8] + ldp x24, x23, [sp, #14*8] + ldp x26, x25, [sp, #12*8] + ldp x28, x27, [sp, #10*8] + ldp x30, x29, [sp, #8*8] // lr, fp + ldp d9, d8, [sp, #6*8] + ldp d11, d10, [sp, #4*8] + ldp d13, d12, [sp, #2*8] + ldp d15, d14, [sp], #22*8 + + // avoid a stack corruption that will confuse the unwinder + mov x16, x30 // save lr + mov x30, #0 // reset lr + br x16 // jump to new pc value + " :: "r"(current_context), "r"(new_context)) + end +end diff --git a/src/fiber/context/arm.cr b/src/fiber/context/arm.cr new file mode 100644 index 000000000000..96f7c296ade5 --- /dev/null +++ b/src/fiber/context/arm.cr @@ -0,0 +1,72 @@ +{% skip_file unless flag?(:arm) %} + +class Fiber + # :nodoc: + def makecontext(stack_ptr, fiber_main) : Void* + # in ARMv6 / ARVMv7, the context switch push/pop 8 registers, add one more + # to store the argument of `fiber_main`, and 8 64-bit FPU registers if a FPU + # is present, we thus reserve space for 9 or 25 pointers: + {% if flag?(:armhf) %} + @context.stack_top = (stack_ptr - 25).as(Void*) + {% else %} + @context.stack_top = (stack_ptr - 9).as(Void*) + {% end %} + @context.resumable = 1 + + stack_ptr[0] = fiber_main.pointer # lr: initial `resume` will `ret` to this address + stack_ptr[-9] = self.as(Void*) # r0: puts `self` as first argument for `fiber_main` + end + + # :nodoc: + @[NoInline] + @[Naked] + def self.swapcontext(current_context, new_context) : Nil + # ARM assembly requires integer literals to be moved to a register before + # being stored at an address; we use r4 as a scratch register that will be + # overwritten by the new context. + # + # Eventually reset LR to zero to avoid the ARM unwinder to mistake the + # context switch as a regular call. + + {% if flag?(:armhf) %} + asm(" + // declare the presence of a conservative FPU to the ASM compiler + .fpu vfp + + stmdb sp!, {r0, r4-r11, lr} // push 1st argument + callee-saved registers + vstmdb sp!, {d8-d15} // push FPU registers + str sp, [$0, #0] // current_context.stack_top = sp + mov r4, #1 // current_context.resumable = 1 + str r4, [$0, #4] + + mov r4, #0 // new_context.resumable = 0 + str r4, [$1, #4] + ldr sp, [$1, #0] // sp = new_context.stack_top + vldmia sp!, {d8-d15} // pop FPU registers + ldmia sp!, {r0, r4-r11, lr} // pop 1st argument + calleed-saved registers + + // avoid a stack corruption that will confuse the unwinder + mov r1, lr + mov lr, #0 + mov pc, r1 + " :: "r"(current_context), "r"(new_context)) + {% elsif flag?(:arm) %} + asm(" + stmdb sp!, {r0, r4-r11, lr} // push 1st argument + calleed-saved registers + str sp, [$0, #0] // current_context.stack_top = sp + mov r4, #1 // current_context.resumable = 1 + str r4, [$0, #4] + + mov r4, #0 // new_context.resumable = 0 + str r4, [$1, #4] + ldr sp, [$1, #0] // sp = new_context.stack_top + ldmia sp!, {r0, r4-r11, lr} // pop 1st argument + calleed-saved registers + + // avoid a stack corruption that will confuse the unwinder + mov r1, lr + mov lr, #0 + mov pc, r1 + " :: "r"(current_context), "r"(new_context)) + {% end %} + end +end diff --git a/src/fiber/context/i686.cr b/src/fiber/context/i686.cr new file mode 100644 index 000000000000..8bb02e8bd07f --- /dev/null +++ b/src/fiber/context/i686.cr @@ -0,0 +1,37 @@ +{% skip_file unless flag?(:i686) %} + +class Fiber + # :nodoc: + def makecontext(stack_ptr, fiber_main) + # in IA32 (x86), the context switch push/pop 4 registers, and we need two + # more to store the argument for `fiber_main` and keep the stack aligned on + # 16 bytes, we thus reserve space for 6 pointers: + @context.stack_top = (stack_ptr - 6).as(Void*) + @context.resumable = 1 + + stack_ptr[0] = self.as(Void*) # first argument passed on the stack + stack_ptr[-1] = Pointer(Void).null # empty space to keep the stack alignment (16 bytes) + stack_ptr[-2] = fiber_main.pointer # initial `resume` will `ret` to this address + end + + # :nodoc: + @[NoInline] + @[Naked] + def self.swapcontext(current_context, new_context) : Nil + asm(" + pushl %edi // push 1st argument (because of initial resume) + pushl %ebx // push callee-saved registers on the stack + pushl %ebp + pushl %esi + movl %esp, 0($0) // current_context.stack_top = %esp + movl $$1, 4($0) // current_context.resumable = 1 + + movl $$0, 4($1) // new_context.resumable = 0 + movl 0($1), %esp // %esp = new_context.stack_top + popl %esi // pop callee-saved registers from the stack + popl %ebp + popl %ebx + popl %edi // pop first argument (for initial resume) + " :: "r"(current_context), "r"(new_context)) + end +end diff --git a/src/fiber/context/x86_64.cr b/src/fiber/context/x86_64.cr new file mode 100644 index 000000000000..951bb88ad7c7 --- /dev/null +++ b/src/fiber/context/x86_64.cr @@ -0,0 +1,41 @@ +{% skip_file unless flag?(:x86_64) %} + +class Fiber + # :nodoc: + def makecontext(stack_ptr, fiber_main) : Nil + # in x86-64, the context switch push/pop 6 registers + the return address + # that is left on the stack, we thus reserve space for 7 pointers: + @context.stack_top = (stack_ptr - 7).as(Void*) + @context.resumable = 1 + + stack_ptr[0] = fiber_main.pointer # %rbx: initial `resume` will `ret` to this address + stack_ptr[-1] = self.as(Void*) # %rdi: puts `self` as first argument for `fiber_main` + end + + # :nodoc: + @[NoInline] + @[Naked] + def self.swapcontext(current_context, new_context) : Nil + asm(" + pushq %rdi // push 1st argument (because of initial resume) + pushq %rbx // push callee-saved registers on the stack + pushq %rbp + pushq %r12 + pushq %r13 + pushq %r14 + pushq %r15 + movq %rsp, 0($0) // current_context.stack_top = %rsp + movl $$1, 8($0) // current_context.resumable = 1 + + movl $$0, 8($1) // new_context.resumable = 0 + movq 0($1), %rsp // %rsp = new_context.stack_top + popq %r15 // pop callee-saved registers from the stack + popq %r14 + popq %r13 + popq %r12 + popq %rbp + popq %rbx + popq %rdi // pop 1st argument (for initial resume) + " :: "r"(current_context), "r"(new_context)) + end +end diff --git a/src/fiber/i686.cr b/src/fiber/i686.cr deleted file mode 100644 index 09e9e42dc373..000000000000 --- a/src/fiber/i686.cr +++ /dev/null @@ -1,33 +0,0 @@ -{% skip_file unless flag?(:i686) %} - -class Fiber - # :nodoc: - def makecontext(stack_ptr, fiber_main) - # in IA32, the context switch push/pops 4 registers. - # add two more to store the argument of `fiber_main`: - @stack_top = (stack_ptr - 6).as(Void*) - - stack_ptr[0] = self.as(Void*) # first argument passed on the stack - stack_ptr[-1] = Pointer(Void).null # empty space to keep the stack alignment (16 bytes) - stack_ptr[-2] = fiber_main.pointer # initial `resume` will `ret` to this address - end - - # :nodoc: - @[NoInline] - @[Naked] - def self.swapcontext(current, to) : Nil - asm(" - pushl %edi - pushl %ebx - pushl %ebp - pushl %esi - movl %esp, ($0) - - movl $1, %esp - popl %esi - popl %ebp - popl %ebx - popl %edi" - :: "r"(current), "r"(to)) - end -end diff --git a/src/fiber/x86_64.cr b/src/fiber/x86_64.cr deleted file mode 100644 index 7ac67e9bd875..000000000000 --- a/src/fiber/x86_64.cr +++ /dev/null @@ -1,37 +0,0 @@ -{% skip_file unless flag?(:x86_64) %} - -class Fiber - # :nodoc: - def makecontext(stack_ptr, fiber_main) - # in x86-64, the context switch push/pop 7 registers - @stack_top = (stack_ptr - 7).as(Void*) - - stack_ptr[0] = fiber_main.pointer # initial `resume` will `ret` to this address - stack_ptr[-1] = self.as(Void*) # this will be `pop` into %rdi (first argument) - end - - # :nodoc: - @[NoInline] - @[Naked] - def self.swapcontext(current, to) : Nil - asm(" - pushq %rdi - pushq %rbx - pushq %rbp - pushq %r12 - pushq %r13 - pushq %r14 - pushq %r15 - movq %rsp, ($0) - - movq $1, %rsp - popq %r15 - popq %r14 - popq %r13 - popq %r12 - popq %rbp - popq %rbx - popq %rdi" - :: "r"(current), "r"(to)) - end -end From aed2edbaecd4f8d6efd7f2dd025bd0f6a7069706 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Fri, 19 Oct 2018 18:56:52 +0200 Subject: [PATCH 2/4] Extract Fiber::StackPool Keeps the pool of free stacks to recycle out of Fiber itself, and makes it thread-safe using simple thread mutexes. A better algorithm could eventually be used (e.g. nonblocking or flat-combining) to speed up spawning fibers in parallel. --- src/fiber.cr | 18 ++++-------------- src/fiber/stack_pool.cr | 42 +++++++++++++++++++++++++++++++++++++++++ src/kernel.cr | 2 +- 3 files changed, 47 insertions(+), 15 deletions(-) create mode 100644 src/fiber/stack_pool.cr diff --git a/src/fiber.cr b/src/fiber.cr index 2152c7dffb34..672200c7dce2 100644 --- a/src/fiber.cr +++ b/src/fiber.cr @@ -1,6 +1,7 @@ require "c/sys/mman" require "thread/linked_list" -require "./fiber/context" +require "fiber/context" +require "fiber/stack_pool" # :nodoc: @[NoInline] @@ -21,7 +22,6 @@ class Fiber STACK_SIZE = 8 * 1024 * 1024 @@fibers = Thread::LinkedList(Fiber).new - @@stack_pool = [] of Void* @context : Context @stack : Void* @@ -71,7 +71,7 @@ class Fiber end protected def self.allocate_stack - if pointer = @@stack_pool.pop? + if pointer = stack_pool.pop? return pointer end @@ -91,16 +91,6 @@ class Fiber end end - # :nodoc: - def self.stack_pool_collect - return if @@stack_pool.size == 0 - free_count = @@stack_pool.size > 1 ? @@stack_pool.size / 2 : 1 - free_count.times do - stack = @@stack_pool.pop - LibC.munmap(stack, Fiber::STACK_SIZE) - end - end - # :nodoc: def run @proc.call @@ -113,7 +103,7 @@ class Fiber ex.inspect_with_backtrace(STDERR) STDERR.flush ensure - @@stack_pool << @stack + Fiber.stack_pool << @stack # Remove the current fiber from the linked list @@fibers.delete(self) diff --git a/src/fiber/stack_pool.cr b/src/fiber/stack_pool.cr new file mode 100644 index 000000000000..47cdfae7d631 --- /dev/null +++ b/src/fiber/stack_pool.cr @@ -0,0 +1,42 @@ +class Fiber + # :nodoc: + class StackPool + def initialize + @deque = Deque(Void*).new + @mutex = Thread::Mutex.new + end + + # Removes and frees at most *count* stacks from the top of the list, + # returning memory to the operating system. + def collect(count = lazy_size / 2) + count.times do + if stack = @deque.shift? + LibC.munmap(stack, Fiber::STACK_SIZE) + else + return + end + end + end + + # Removes a stack from the bottom of the list. + def pop? + @mutex.synchronize { @deque.pop? } + end + + # Appends a stack to the bottom of the list. + def <<(stack) + @mutex.synchronize { @deque.push(stack) } + end + + # Returns the approximated size of the pool. It may be equal or slightly + # bigger or smaller than the actual size. + def lazy_size + @deque.size + end + end + + # :nodoc: + def self.stack_pool + @@stack_pool ||= StackPool.new + end +end diff --git a/src/kernel.cr b/src/kernel.cr index 173f368d6876..6c50ca73491c 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -545,7 +545,7 @@ end spawn do loop do sleep 5 - Fiber.stack_pool_collect + Fiber.stack_pool.collect end end From 824a2dd79c8fab31eab13a719dcf6968b47fb73f Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Tue, 5 Feb 2019 23:24:03 +0100 Subject: [PATCH 3/4] Fix: execute select actions inside their own fiber In some scenarios, for example concurrent send actions to buffered channels, it was possible to enqueue the current fiber *many* times which led to segfaults, trying to resume a dead fiber or to switch to the currently running fiber. All channels actions are now performed inside a dedicated fiber, so the wait operation is safe (on a single thread). Still not MT-safe: select actions need to compete on an atomic flag when ready, and be resilient to a failed execution (e.g. another thread already sent/receive from the channel). --- spec/std/concurrent/select_spec.cr | 50 +++++++++++++++- src/channel.cr | 93 ++++++++++++++++++++++-------- 2 files changed, 119 insertions(+), 24 deletions(-) diff --git a/spec/std/concurrent/select_spec.cr b/spec/std/concurrent/select_spec.cr index cd598f1b0e77..233e1a0528db 100644 --- a/spec/std/concurrent/select_spec.cr +++ b/spec/std/concurrent/select_spec.cr @@ -6,7 +6,7 @@ private def yield_to(fiber) end describe "select" do - it "select many receviers" do + it "select many receivers" do ch1 = Channel(Int32).new ch2 = Channel(Int32).new res = [] of Int32 @@ -72,6 +72,35 @@ describe "select" do res.should eq (0...10).to_a end + it "select else clause and cancel other clauses" do + ch1 = Channel::Buffered(Int32).new(1) + ch2 = Channel::Buffered(Int32).new(1) + + select + when ch1.receive + got = 1 + when ch2.receive + got = 2 + else + got = -1 + end + + got.should eq(-1) + + spawn do + ch1.send(1) + ch2.send(2) + ch1.close + ch2.close + end + + ch1.receive.should eq(1) + ch1.receive?.should be_nil + + ch2.receive.should eq(2) + ch2.receive?.should be_nil + end + it "select should work with send which started before receive, fixed #3862" do ch1 = Channel(Int32).new ch2 = Channel(Int32).new @@ -100,4 +129,23 @@ describe "select" do sleep x.should eq 1 end + + it "won't enqueue a dead/running fiber, fixed #3900" do + ch = Channel::Buffered(Int32).new(1) + + spawn do + ch.send(1) + + select + when ch.send(1) + when ch.send(2) + end + + ch.close + end + + ch.receive.should eq(1) + ch.receive.should eq(1) + ch.receive?.should be_nil + end end diff --git a/src/channel.cr b/src/channel.cr index f283d3971b0b..26d24ea63c41 100644 --- a/src/channel.cr +++ b/src/channel.cr @@ -2,10 +2,13 @@ require "fiber" abstract class Channel(T) module SelectAction + getter? canceled = false + getter? waiting = false + abstract def ready? abstract def execute - abstract def wait - abstract def unwait + abstract def wait : Bool + abstract def unwait(fiber : Fiber) end class ClosedError < Exception @@ -61,16 +64,16 @@ abstract class Channel(T) @receivers << Fiber.current end - protected def unwait_for_receive - @receivers.delete Fiber.current + protected def unwait_for_receive(fiber) + @receivers.delete fiber end protected def wait_for_send @senders << Fiber.current end - protected def unwait_for_send - @senders.delete Fiber.current + protected def unwait_for_send(fiber) + @senders.delete fiber end protected def raise_if_closed @@ -94,26 +97,66 @@ abstract class Channel(T) nil end + # :nodoc: def self.select(*ops : SelectAction) self.select ops end + # :nodoc: + # + # Executes all operations inside its own fiber to wait in. Postpones the fiber + # execution so the fibers' array will always be filled with all fibers, and + # any ready operation can cancel all other fibers ASAP. def self.select(ops : Tuple | Array, has_else = false) - loop do - ops.each_with_index do |op, index| - if op.ready? - result = op.execute - return index, result - end + # fast path: check if any clause is ready + ops.each_with_index do |op, i| + if op.ready? + return {i, op.execute} end + end + + if has_else + return {ops.size, nil} + end - if has_else - return ops.size, nil + # slow path: spawn fibers to wait on each clause + main = Fiber.current + fibers = Array(Fiber).new(ops.size) + index = -1 + value = nil + + ops.each_with_index do |op, i| + fibers << Fiber.new(name: i.to_s) do + loop do + break if op.canceled? + + if op.ready? + # cancel other fibers before executing the op, which could switch + # the current context: + cancel_select_actions(ops, fibers, i) + index, value = i, op.execute + Crystal::Scheduler.enqueue(main) + break + end + + op.wait + Crystal::Scheduler.reschedule + end end + end - ops.each &.wait - Crystal::Scheduler.reschedule - ops.each &.unwait + Crystal::Scheduler.enqueue(fibers) + Crystal::Scheduler.reschedule + + {index, value} + end + + private def self.cancel_select_actions(ops, fibers, running_index) + ops.each_with_index do |op, i| + next if i == running_index + fiber = fibers[i] + op.unwait(fiber) + Crystal::Scheduler.enqueue(fiber) if op.waiting? end end @@ -128,7 +171,7 @@ abstract class Channel(T) end # :nodoc: - struct ReceiveAction(C) + class ReceiveAction(C) include SelectAction def initialize(@channel : C) @@ -144,15 +187,17 @@ abstract class Channel(T) def wait @channel.wait_for_receive + @waiting = true end - def unwait - @channel.unwait_for_receive + def unwait(fiber) + @canceled = true + @channel.unwait_for_receive(fiber) end end # :nodoc: - struct SendAction(C, T) + class SendAction(C, T) include SelectAction def initialize(@channel : C, @value : T) @@ -168,10 +213,12 @@ abstract class Channel(T) def wait @channel.wait_for_send + @waiting = true end - def unwait - @channel.unwait_for_send + def unwait(fiber) + @canceled = true + @channel.unwait_for_send(fiber) end end end From 7e289970c17be2a1c701c18d8f00cc1b0e7780f0 Mon Sep 17 00:00:00 2001 From: Julien Portalier Date: Mon, 11 Feb 2019 10:50:07 +0100 Subject: [PATCH 4/4] fixup: blocking the fiber is the responsibility of SelectAction#wait --- src/channel.cr | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/channel.cr b/src/channel.cr index 26d24ea63c41..0251450d291f 100644 --- a/src/channel.cr +++ b/src/channel.cr @@ -140,7 +140,6 @@ abstract class Channel(T) end op.wait - Crystal::Scheduler.reschedule end end end @@ -188,6 +187,7 @@ abstract class Channel(T) def wait @channel.wait_for_receive @waiting = true + Crystal::Scheduler.reschedule end def unwait(fiber) @@ -214,6 +214,7 @@ abstract class Channel(T) def wait @channel.wait_for_send @waiting = true + Crystal::Scheduler.reschedule end def unwait(fiber)