Skip to content

Commit

Permalink
Try to fix polling/parking interaction
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Apr 20, 2023
1 parent 144c049 commit 33d8ff5
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,16 @@ abstract class PollingSystem {
* currently hard-coded into every test framework, including MUnit, specs2, and Weaver.
*
* @return
* whether poll should be called again (i.e., there are more events to be polled)
* whether any events were polled
*/
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean

/**
* @return
* whether poll should be called again (i.e., there are more events to be polled)
*/
def needsPoll(poller: Poller): Boolean

def interrupt(targetThread: Thread, targetPoller: Poller): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
else selector.select()

if (selector.isOpen()) { // closing selector interrupts select
var polled = false

val ready = selector.selectedKeys().iterator()
while (ready.hasNext()) {
val key = ready.next()
Expand All @@ -59,6 +61,7 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
val cb = node.callback
if (cb != null) cb(value)
if (prev ne null) prev.next = next
polled = true
} else { // keep this node
prev = node
if (head eq null)
Expand All @@ -77,6 +80,9 @@ final class SelectorSystem private (provider: SelectorProvider) extends PollingS
} else false
}

def needsPoll(poller: Poller): Boolean =
!poller.selector.keys().isEmpty()

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = {
targetPoller.selector.wakeup()
()
Expand Down
2 changes: 2 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ object SleepSystem extends PollingSystem {
false
}

def needsPoll(poller: Poller): Boolean = false

def interrupt(targetThread: Thread, targetPoller: Poller): Unit =
LockSupport.unpark(targetThread)

Expand Down
42 changes: 23 additions & 19 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,17 @@ private final class WorkerThread(
def park(): Int = {
val tt = sleepers.peekFirstTriggerTime()
val nextState = if (tt == MIN_VALUE) { // no sleepers
parkLoop()

// After the worker thread has been unparked, look for work in the
// external queue.
3
if (parkLoop()) {
// we polled something, so go straight to local queue stuff
pool.transitionWorkerFromSearching(rnd)
4
} else {
// we were interrupted, look for more work in the external queue
3
}
} else {
if (parkUntilNextSleeper()) {
// we made it to the end of our sleeping, so go straight to local queue stuff
// we made it to the end of our sleeping/polling, so go straight to local queue stuff
pool.transitionWorkerFromSearching(rnd)
4
} else {
Expand All @@ -357,22 +360,24 @@ private final class WorkerThread(
}
}

def parkLoop(): Unit = {
var cont = true
while (cont && !done.get()) {
// returns true if polled event, false if unparked
def parkLoop(): Boolean = {
while (!done.get()) {
// Park the thread until further notice.
system.poll(_poller, -1, reportFailure)
val polled = system.poll(_poller, -1, reportFailure)

// the only way we can be interrupted here is if it happened *externally* (probably sbt)
if (isInterrupted())
pool.shutdown()
else
// Spurious wakeup check.
cont = parked.get()
else if (polled || !parked.get()) // Spurious wakeup check.
return polled
else // loop
()
}
false
}

// returns true if timed out, false if unparked
// returns true if timed out or polled event, false if unparked
@tailrec
def parkUntilNextSleeper(): Boolean = {
if (done.get()) {
Expand All @@ -382,22 +387,21 @@ private final class WorkerThread(
if (triggerTime == MIN_VALUE) {
// no sleeper (it was removed)
parkLoop()
true
} else {
val now = System.nanoTime()
val nanos = triggerTime - now

if (nanos > 0L) {
system.poll(_poller, nanos, reportFailure)
val polled = system.poll(_poller, nanos, reportFailure)

if (isInterrupted()) {
pool.shutdown()
false // we know `done` is `true`
} else {
if (parked.get()) {
// we were either awakened spuriously, or we timed out
if (triggerTime - System.nanoTime() <= 0) {
// we timed out
// we were either awakened spuriously, or we timed out or polled an event
if (polled || (triggerTime - System.nanoTime() <= 0)) {
// we timed out or polled an event
if (parked.getAndSet(false)) {
pool.doneSleeping()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ object EpollSystem extends PollingSystem {
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean =
poller.poll(nanos)

def needsPoll(poller: Poller): Boolean = poller.needsPoll()

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

final class GlobalPollingState private[EpollSystem] (register: (Poller => Unit) => Unit)
Expand Down Expand Up @@ -184,13 +186,16 @@ object EpollSystem extends PollingSystem {
false // nothing to do here
else {
val events = stackalloc[epoll_event](MaxEvents.toLong)
var polled = false

@tailrec
def processEvents(timeout: Int): Unit = {

val triggeredEvents = epoll_wait(epfd, events, MaxEvents, timeout)

if (triggeredEvents >= 0) {
polled = true

var i = 0
while (i < triggeredEvents) {
val event = events + i.toLong
Expand All @@ -215,6 +220,8 @@ object EpollSystem extends PollingSystem {
}
}

private[EpollSystem] def needsPoll(): Boolean = !handles.isEmpty()

private[EpollSystem] def register(
fd: Int,
reads: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ private[effect] final class EventLoopExecutorScheduler(pollEvery: Int, system: P
else
-1

val needsPoll = system.poll(poller, timeout, reportFailure)
system.poll(poller, timeout, reportFailure)

continue = needsPoll || !executeQueue.isEmpty() || !sleepQueue.isEmpty()
continue = !executeQueue.isEmpty() || !sleepQueue.isEmpty() || system.needsPoll(poller)
}

needsReschedule = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ object KqueueSystem extends PollingSystem {
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean =
poller.poll(nanos)

def needsPoll(poller: Poller): Boolean =
poller.needsPoll()

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

final class GlobalPollingState private[KqueueSystem] (
Expand Down Expand Up @@ -167,6 +170,7 @@ object KqueueSystem extends PollingSystem {
else {

val eventlist = stackalloc[kevent64_s](MaxEvents.toLong)
var polled = false

@tailrec
def processEvents(timeout: Ptr[timespec], changeCount: Int, flags: Int): Unit = {
Expand All @@ -183,6 +187,8 @@ object KqueueSystem extends PollingSystem {
)

if (triggeredEvents >= 0) {
polled = true

var i = 0
var event = eventlist
while (i < triggeredEvents) {
Expand Down Expand Up @@ -226,6 +232,7 @@ object KqueueSystem extends PollingSystem {
}
}

def needsPoll(): Boolean = !callbacks.isEmpty()
}

@nowarn212
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ abstract class PollingExecutorScheduler(pollEvery: Int)
new PollingSystem {
type GlobalPollingState = outer.type
type Poller = outer.type
private[this] var needsPoll = true
def makeGlobalPollingState(register: (Poller => Unit) => Unit): GlobalPollingState = outer
def makePoller(): Poller = outer
def closePoller(poller: Poller): Unit = ()
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean =
if (nanos == -1) poller.poll(Duration.Inf) else poller.poll(nanos.nanos)
def poll(poller: Poller, nanos: Long, reportFailure: Throwable => Unit): Boolean = {
needsPoll =
if (nanos == -1)
poller.poll(Duration.Inf)
else
poller.poll(nanos.nanos)
true
}
def needsPoll(poller: Poller) = needsPoll
def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ object SleepSystem extends PollingSystem {
false
}

def needsPoll(poller: Poller): Boolean = false

def interrupt(targetThread: Thread, targetPoller: Poller): Unit = ()

}

0 comments on commit 33d8ff5

Please sign in to comment.