Skip to content

Commit

Permalink
Merge pull request #3219 from djspiewak/feature/integrated-timers
Browse files Browse the repository at this point in the history
Integrated timers
  • Loading branch information
djspiewak authored Jan 28, 2023
2 parents e4f2b71 + 5951e31 commit be6325d
Show file tree
Hide file tree
Showing 16 changed files with 549 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.benchmarks

import cats.effect.IO
import cats.effect.unsafe._
import cats.syntax.all._

import org.openjdk.jmh.annotations._

import scala.concurrent.duration._

import java.util.concurrent.TimeUnit

/**
* To do comparative benchmarks between versions:
*
* benchmarks/run-benchmark SleepBenchmark
*
* This will generate results in `benchmarks/results`.
*
* Or to run the benchmark from within sbt:
*
* jmh:run -i 10 -wi 10 -f 2 -t 1 cats.effect.benchmarks.SleepBenchmark
*
* Which means "10 iterations", "10 warm-up iterations", "2 forks", "1 thread". Please note that
* benchmarks should be usually executed at least in 10 iterations (as a rule of thumb), but
* more is better.
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MINUTES)
class SleepBenchmark {

@Param(Array("10000"))
var size: Int = _

def sleepBenchmark(implicit runtime: IORuntime): Int = {
def fiber(i: Int): IO[Int] =
IO.sleep(1.nanosecond).flatMap { _ =>
IO(i).flatMap { j =>
IO.sleep(1.nanosecond).flatMap { _ =>
if (j > 1000)
IO.sleep(1.nanosecond).flatMap(_ => IO.pure(j))
else
IO.sleep(1.nanosecond).flatMap(_ => fiber(j + 1))
}
}
}

List
.range(0, size)
.traverse(fiber(_).start)
.flatMap(_.traverse(_.joinWithNever))
.map(_.sum)
.unsafeRunSync()
}

@Benchmark
def sleep(): Int = {
import cats.effect.unsafe.implicits.global
sleepBenchmark
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import scala.concurrent.duration._

object SleepDrift extends IOApp.Simple {

override val runtimeConfig =
super.runtimeConfig.copy(cpuStarvationCheckInitialDelay = Duration.Inf)

val delayTwoMinutes = {
def loop(n: Int): IO[Unit] = {
if (n <= 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,36 +165,23 @@ class WorkStealingBenchmark {
(ExecutionContext.fromExecutor(executor), () => executor.shutdown())
}

val (scheduler, schedDown) = {
val executor = Executors.newSingleThreadScheduledExecutor { r =>
val t = new Thread(r)
t.setName("io-scheduler")
t.setDaemon(true)
t.setPriority(Thread.MAX_PRIORITY)
t
}
(Scheduler.fromScheduledExecutor(executor), () => executor.shutdown())
}

val compute =
new WorkStealingThreadPool(
256,
"io-compute",
"io-blocker",
60.seconds,
_.printStackTrace())
val compute = new WorkStealingThreadPool(
256,
"io-compute",
"io-blocker",
60.seconds,
_.printStackTrace())

val cancelationCheckThreshold =
System.getProperty("cats.effect.cancelation.check.threshold", "512").toInt

IORuntime(
compute,
blocking,
scheduler,
compute,
() => {
compute.shutdown()
blockDown()
schedDown()
},
IORuntimeConfig(
cancelationCheckThreshold,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cats.effect
package unsafe

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

// Can you imagine a thread pool on JS? Have fun trying to extend or instantiate
// this class. Unfortunately, due to the explicit branching, this type leaks
Expand All @@ -27,6 +28,13 @@ private[effect] sealed abstract class WorkStealingThreadPool private ()
def execute(runnable: Runnable): Unit
def reportFailure(cause: Throwable): Unit
private[effect] def reschedule(runnable: Runnable): Unit
private[effect] def sleepInternal(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit): Runnable
private[effect] def sleep(
delay: FiniteDuration,
task: Runnable,
fallback: Scheduler): Runnable
private[effect] def canExecuteBlockingCode(): Boolean
private[unsafe] def liveTraces(): (
Map[Runnable, Trace],
Expand Down
6 changes: 1 addition & 5 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,17 +322,13 @@ trait IOApp {
val (blocking, blockDown) =
IORuntime.createDefaultBlockingExecutionContext()

val (scheduler, schedDown) =
IORuntime.createDefaultScheduler()

IORuntime(
compute,
blocking,
scheduler,
compute,
{ () =>
compDown()
blockDown()
schedDown()
},
runtimeConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type
installGlobal {
val (compute, _) = createWorkStealingComputeThreadPool()
val (blocking, _) = createDefaultBlockingExecutionContext()
val (scheduler, _) = createDefaultScheduler()
IORuntime(compute, blocking, scheduler, () => (), IORuntimeConfig())

IORuntime(compute, blocking, compute, () => (), IORuntimeConfig())
}
}

Expand Down
79 changes: 79 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepCallback.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe

import scala.concurrent.duration.FiniteDuration

import java.util.concurrent.atomic.AtomicBoolean

private final class SleepCallback private (
val triggerTime: Long,
private[this] var _callback: Right[Nothing, Unit] => Unit)
extends AtomicBoolean(true)
with Runnable {

def callback(r: Right[Nothing, Unit]): Unit = {
val cb = _callback
if (cb != null) {
cb(r)
}
}

override def run(): Unit = {
lazySet(false)
_callback = null // avoid memory leaks
}
}

private object SleepCallback {

/**
* Translated to Scala from:
* https://github.com/openjdk/jdk/blob/04a806ec86a388b8de31d42f904c4321beb69e14/src/java.base/share/classes/java/util/concurrent/ScheduledThreadPoolExecutor.java#L527-L547
*/
def create(
delay: FiniteDuration,
callback: Right[Nothing, Unit] => Unit,
now: Long,
sleepers: SleepersQueue): SleepCallback = {

def overflowFree(delay: Long, now: Long): Long =
if (sleepers.isEmpty) delay
else {
val head = sleepers.head()
val headDelay = head.triggerTime - now
if (headDelay < 0 && (delay - headDelay < 0))
Long.MaxValue + headDelay
else
delay
}

val triggerTime = {
val delayNanos = delay.toNanos

if (delayNanos < (Long.MaxValue >> 1))
now + delayNanos
else
now + overflowFree(delayNanos, now)
}

new SleepCallback(triggerTime, callback)
}

implicit val sleepCallbackReverseOrdering: Ordering[SleepCallback] =
Ordering.fromLessThan(_.triggerTime > _.triggerTime)
}
50 changes: 50 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/SleepersQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe

import scala.collection.mutable.PriorityQueue

private final class SleepersQueue private () {
private[this] val queue: PriorityQueue[SleepCallback] = PriorityQueue.empty
private[this] var count: Int = 0

def isEmpty: Boolean =
count == 0

def nonEmpty: Boolean =
!isEmpty

def head(): SleepCallback =
queue.head

def +=(scb: SleepCallback): Unit = {
queue += scb
count += 1
}

def popHead(): Unit = {
queue.dequeue()
count -= 1
()
}

override def toString = s"SleepersQueue($queue, $count)"
}

private object SleepersQueue {
def empty: SleepersQueue = new SleepersQueue()
}
Loading

0 comments on commit be6325d

Please sign in to comment.