Skip to content

Commit

Permalink
Merge pull request #3225 from armanbilge/feature/batching-macrotask-e…
Browse files Browse the repository at this point in the history
…xecutor

Introduce a `BatchingMacrotaskExecutor`
  • Loading branch information
djspiewak authored Jan 28, 2023
2 parents 9f7abed + 12afda7 commit e4f2b71
Show file tree
Hide file tree
Showing 14 changed files with 486 additions and 71 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,11 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
// mystery filters that became required in 3.4.0
ProblemFilters.exclude[DirectMissingMethodProblem](
"cats.effect.tracing.TracingConstants.*"),
// introduced by #3225, which added the BatchingMacrotaskExecutor
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.FiberAwareExecutionContext"),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"cats.effect.unsafe.ES2021FiberMonitor.this"),
// introduced by #3324, which specialized CallbackStack for JS
// internal API change
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("cats.effect.CallbackStack")
Expand Down
21 changes: 15 additions & 6 deletions core/js/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package cats.effect

import cats.effect.metrics.JsCpuStarvationMetrics
import cats.effect.std.Console
import cats.effect.tracing.TracingConstants._

import scala.concurrent.CancellationException
Expand Down Expand Up @@ -167,6 +168,16 @@ trait IOApp {
*/
protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig()

/**
* Configures the action to perform when unhandled errors are caught by the runtime. By
* default, this simply delegates to [[cats.effect.std.Console!.printStackTrace]]. It is safe
* to perform any `IO` action within this handler; it will not block the progress of the
* runtime. With that said, some care should be taken to avoid raising unhandled errors as a
* result of handling unhandled errors, since that will result in the obvious chaos.
*/
protected def reportFailure(err: Throwable): IO[Unit] =
Console[IO].printStackTrace(err)

/**
* The entry point for your application. Will be called by the runtime when the process is
* started. If the underlying runtime supports it, any arguments passed to the process will be
Expand All @@ -188,12 +199,10 @@ trait IOApp {
import unsafe.IORuntime

val installed = IORuntime installGlobal {
IORuntime(
IORuntime.defaultComputeExecutionContext,
IORuntime.defaultComputeExecutionContext,
IORuntime.defaultScheduler,
() => (),
runtimeConfig)
val compute = IORuntime.createBatchingMacrotaskExecutor(reportFailure = t =>
reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime))

IORuntime(compute, compute, IORuntime.defaultScheduler, () => (), runtimeConfig)
}

_runtime = IORuntime.global
Expand Down
23 changes: 23 additions & 0 deletions core/js/src/main/scala/cats/effect/Platform.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect

private object Platform {
final val isJs = true
final val isJvm = false
final val isNative = false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect
package unsafe

import cats.effect.tracing.TracingConstants

import org.scalajs.macrotaskexecutor.MacrotaskExecutor

import scala.collection.mutable
import scala.concurrent.ExecutionContextExecutor
import scala.scalajs.{js, LinkingInfo}
import scala.util.control.NonFatal

/**
* An `ExecutionContext` that improves throughput by providing a method to `schedule` fibers to
* execute in batches, instead of one task per event loop iteration. This optimization targets
* the typical scenario where a UI or I/O event handler starts/resumes a small number of
* short-lived fibers and then yields to the event loop.
*
* This `ExecutionContext` also maintains a fiber bag in development mode to enable fiber dumps.
*
* @param batchSize
* the maximum number of batched runnables to execute before yielding to the event loop
*/
private[effect] final class BatchingMacrotaskExecutor(
batchSize: Int,
reportFailure0: Throwable => Unit
) extends ExecutionContextExecutor {

private[this] val queueMicrotask: js.Function1[js.Function0[Any], Any] =
if (js.typeOf(js.Dynamic.global.queueMicrotask) == "function")
js.Dynamic.global.queueMicrotask.asInstanceOf[js.Function1[js.Function0[Any], Any]]
else {
val resolved = js.Dynamic.global.Promise.resolved(())
task => resolved.`then`(task)
}

/**
* Whether the `executeBatchTask` needs to be rescheduled
*/
private[this] var needsReschedule = true
private[this] val fibers = new JSArrayQueue[IOFiber[_]]

private[this] object executeBatchTaskRunnable extends Runnable {
def run() = {
// do up to batchSize tasks
var i = 0
while (i < batchSize && !fibers.isEmpty()) {
val fiber = fibers.take()

if (LinkingInfo.developmentMode)
if (fiberBag ne null)
fiberBag -= fiber

try fiber.run()
catch {
case t if NonFatal(t) => reportFailure(t)
case t: Throwable => IOFiber.onFatalFailure(t)
}

i += 1
}

if (!fibers.isEmpty()) // we'll be right back after this (post) message
MacrotaskExecutor.execute(this)
else // the batch task will need to be rescheduled when more fibers arrive
needsReschedule = true

// yield to the event loop
}
}

private[this] val executeBatchTaskJSFunction: js.Function0[Any] =
() => executeBatchTaskRunnable.run()

/**
* Execute the `runnable` in the next iteration of the event loop.
*/
def execute(runnable: Runnable): Unit =
MacrotaskExecutor.execute(monitor(runnable))

/**
* Schedule the `fiber` for the next available batch. This is often the currently executing
* batch.
*/
def schedule(fiber: IOFiber[_]): Unit = {
if (LinkingInfo.developmentMode)
if (fiberBag ne null)
fiberBag += fiber

fibers.offer(fiber)

if (needsReschedule) {
needsReschedule = false
// start executing the batch immediately after the currently running task suspends
// this is safe b/c `needsReschedule` is set to `true` only upon yielding to the event loop
queueMicrotask(executeBatchTaskJSFunction)
()
}
}

def reportFailure(t: Throwable): Unit = reportFailure0(t)

def liveTraces(): Map[IOFiber[_], Trace] =
fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap

@inline private[this] def monitor(runnable: Runnable): Runnable =
if (LinkingInfo.developmentMode)
if (fiberBag ne null)
runnable match {
case r: IOFiber[_] =>
fiberBag += r
() => {
// We have to remove r _before_ running it, b/c it may be re-enqueued while running
// b/c JS is single-threaded, nobody can observe the bag while the fiber is running anyway
fiberBag -= r
r.run()
}
case _ => runnable
}
else runnable
else
runnable

private[this] val fiberBag =
if (LinkingInfo.developmentMode)
if (TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable)
mutable.Set.empty[IOFiber[_]]
else
null
else
null

}

This file was deleted.

10 changes: 5 additions & 5 deletions core/js/src/main/scala/cats/effect/unsafe/FiberMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ private[effect] sealed abstract class FiberMonitor extends FiberMonitorShared {
*/
private final class ES2021FiberMonitor(
// A reference to the compute pool of the `IORuntime` in which this suspended fiber bag
// operates. `null` if the compute pool of the `IORuntime` is not a `FiberAwareExecutionContext`.
private[this] val compute: FiberAwareExecutionContext
// operates. `null` if the compute pool of the `IORuntime` is not a `BatchingMacrotaskExecutor`.
private[this] val compute: BatchingMacrotaskExecutor
) extends FiberMonitor {
private[this] val bag = new WeakBag[IOFiber[_]]()

Expand Down Expand Up @@ -102,9 +102,9 @@ private final class NoOpFiberMonitor extends FiberMonitor {
private[effect] object FiberMonitor {
def apply(compute: ExecutionContext): FiberMonitor = {
if (LinkingInfo.developmentMode && weakRefsAvailable) {
if (compute.isInstanceOf[FiberAwareExecutionContext]) {
val faec = compute.asInstanceOf[FiberAwareExecutionContext]
new ES2021FiberMonitor(faec)
if (compute.isInstanceOf[BatchingMacrotaskExecutor]) {
val bmec = compute.asInstanceOf[BatchingMacrotaskExecutor]
new ES2021FiberMonitor(bmec)
} else {
new ES2021FiberMonitor(null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,18 @@

package cats.effect.unsafe

import cats.effect.tracing.TracingConstants

import org.scalajs.macrotaskexecutor.MacrotaskExecutor

import scala.concurrent.ExecutionContext
import scala.scalajs.LinkingInfo

private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type =>

def defaultComputeExecutionContext: ExecutionContext =
if (LinkingInfo.developmentMode && TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable)
new FiberAwareExecutionContext(MacrotaskExecutor)
else
MacrotaskExecutor
createBatchingMacrotaskExecutor()

def createBatchingMacrotaskExecutor(
batchSize: Int = 64,
reportFailure: Throwable => Unit = _.printStackTrace()
): ExecutionContext =
new BatchingMacrotaskExecutor(batchSize, reportFailure)

def defaultScheduler: Scheduler = Scheduler.createDefaultScheduler()._1

Expand Down
72 changes: 72 additions & 0 deletions core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2020-2022 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.unsafe

import scala.scalajs.js

/**
* A JS-Array backed circular buffer FIFO queue. It is careful to grow the buffer only using
* `push` to avoid creating "holes" on V8 (this is a known shortcoming of the Scala.js
* `j.u.ArrayDeque` implementation).
*/
private final class JSArrayQueue[A] {

private[this] val buffer = js.Array[A](null.asInstanceOf[A])

private[this] var startIndex: Int = 0
private[this] var endIndex: Int = 1
private[this] var empty: Boolean = true

@inline def isEmpty(): Boolean = empty

@inline def take(): A = {
val a = buffer(startIndex)
buffer(startIndex) = null.asInstanceOf[A]
startIndex += 1
if (startIndex == endIndex)
empty = true
if (startIndex >= buffer.length)
startIndex = 0
a
}

@inline def offer(a: A): Unit = {
growIfNeeded()
endIndex += 1
if (endIndex > buffer.length)
endIndex = 1
buffer(endIndex - 1) = a
empty = false
}

@inline private[this] def growIfNeeded(): Unit =
if (!empty) { // empty queue always has capacity >= 1
if (startIndex == 0 && endIndex == buffer.length) {
buffer.push(null.asInstanceOf[A])
()
} else if (startIndex == endIndex) {
var i = 0
while (i < endIndex) {
buffer.push(buffer(i))
buffer(i) = null.asInstanceOf[A]
i += 1
}
endIndex = buffer.length
}
}

}
Loading

0 comments on commit e4f2b71

Please sign in to comment.