From 1f7aa389ddc2e8c1e9be344e5490b1e81c31d63f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 5 Nov 2022 05:26:39 +0000 Subject: [PATCH 01/27] Introduce a `BatchingMacrotaskExecutor` --- .../unsafe/BatchingMacrotaskExecutor.scala | 63 +++++++++++++++++++ .../unsafe/IORuntimeCompanionPlatform.scala | 10 +-- 2 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala new file mode 100644 index 0000000000..713009cd59 --- /dev/null +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import org.scalajs.macrotaskexecutor.MacrotaskExecutor + +import scala.concurrent.ExecutionContextExecutor +import scala.util.control.NonFatal + +import java.util.ArrayDeque + +private final class BatchingMacrotaskExecutor(pollEvery: Int) extends ExecutionContextExecutor { + + private[this] var needsReschedule: Boolean = true + + private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque + + private[this] def executeTask = _executeTask + private[this] val _executeTask: Runnable = () => { + // do up to pollEvery tasks + var i = 0 + while (i < pollEvery && !executeQueue.isEmpty()) { + val runnable = executeQueue.poll() + try runnable.run() + catch { + case NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } + i += 1 + } + + if (!executeQueue.isEmpty()) // we'll be right back after the (post) message + MacrotaskExecutor.execute(executeTask) + else + needsReschedule = true + } + + def reportFailure(t: Throwable): Unit = t.printStackTrace() + + def execute(runnable: Runnable): Unit = { + executeQueue.addLast(runnable) + if (needsReschedule) { + needsReschedule = false + MacrotaskExecutor.execute(executeTask) + } + } + +} diff --git a/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 5c40f6e41f..5a6c1b6f44 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -18,18 +18,18 @@ package cats.effect.unsafe import cats.effect.tracing.TracingConstants -import org.scalajs.macrotaskexecutor.MacrotaskExecutor - import scala.concurrent.ExecutionContext import scala.scalajs.LinkingInfo private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type => - def defaultComputeExecutionContext: ExecutionContext = + def defaultComputeExecutionContext: ExecutionContext = { + val ec = new BatchingMacrotaskExecutor(64) if (LinkingInfo.developmentMode && TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable) - new FiberAwareExecutionContext(MacrotaskExecutor) + new FiberAwareExecutionContext(ec) else - MacrotaskExecutor + ec + } def defaultScheduler: Scheduler = Scheduler.createDefaultScheduler()._1 From 743caed33b54d30ea5231a5fb21419adb61b408e Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 28 Nov 2022 06:26:19 +0000 Subject: [PATCH 02/27] Use batching only for `IOFiber#schedule` --- .../src/main/scala/cats/effect/Platform.scala | 23 ++++++++++ .../unsafe/BatchingMacrotaskExecutor.scala | 44 ++++++------------- .../unsafe/BatchingMacrotaskExecutor.scala | 25 +++++++++++ .../src/main/scala/cats/effect/Platform.scala | 23 ++++++++++ .../src/main/scala/cats/effect/Platform.scala | 23 ++++++++++ .../src/main/scala/cats/effect/IOFiber.scala | 7 ++- 6 files changed, 112 insertions(+), 33 deletions(-) create mode 100644 core/js/src/main/scala/cats/effect/Platform.scala create mode 100644 core/jvm-native/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala create mode 100644 core/jvm/src/main/scala/cats/effect/Platform.scala create mode 100644 core/native/src/main/scala/cats/effect/Platform.scala diff --git a/core/js/src/main/scala/cats/effect/Platform.scala b/core/js/src/main/scala/cats/effect/Platform.scala new file mode 100644 index 0000000000..e8a47f3a00 --- /dev/null +++ b/core/js/src/main/scala/cats/effect/Platform.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +private object Platform { + final val isJs = true + final val isJvm = false + final val isNative = false +} diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 713009cd59..c7e8dcd8d7 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -20,44 +20,26 @@ package unsafe import org.scalajs.macrotaskexecutor.MacrotaskExecutor import scala.concurrent.ExecutionContextExecutor -import scala.util.control.NonFatal +import scala.scalajs.concurrent.QueueExecutionContext -import java.util.ArrayDeque +private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) + extends ExecutionContextExecutor { -private final class BatchingMacrotaskExecutor(pollEvery: Int) extends ExecutionContextExecutor { + private[this] val MicrotaskExecutor = QueueExecutionContext.promises() - private[this] var needsReschedule: Boolean = true + private[this] var counter = 0 - private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque + def reportFailure(t: Throwable): Unit = MacrotaskExecutor.reportFailure(t) - private[this] def executeTask = _executeTask - private[this] val _executeTask: Runnable = () => { - // do up to pollEvery tasks - var i = 0 - while (i < pollEvery && !executeQueue.isEmpty()) { - val runnable = executeQueue.poll() - try runnable.run() - catch { - case NonFatal(t) => reportFailure(t) - case t: Throwable => IOFiber.onFatalFailure(t) - } - i += 1 - } + def execute(runnable: Runnable): Unit = + MacrotaskExecutor.execute(runnable) - if (!executeQueue.isEmpty()) // we'll be right back after the (post) message - MacrotaskExecutor.execute(executeTask) + def schedule(runnable: Runnable): Unit = { + if (counter % batchSize == 0) + MacrotaskExecutor.execute(runnable) else - needsReschedule = true - } - - def reportFailure(t: Throwable): Unit = t.printStackTrace() - - def execute(runnable: Runnable): Unit = { - executeQueue.addLast(runnable) - if (needsReschedule) { - needsReschedule = false - MacrotaskExecutor.execute(executeTask) - } + MicrotaskExecutor.execute(runnable) + counter += 1 } } diff --git a/core/jvm-native/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/jvm-native/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala new file mode 100644 index 0000000000..1508fc4e31 --- /dev/null +++ b/core/jvm-native/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import scala.concurrent.ExecutionContext + +private[effect] sealed abstract class BatchingMacrotaskExecutor private () + extends ExecutionContext { + def schedule(runnable: Runnable): Unit +} diff --git a/core/jvm/src/main/scala/cats/effect/Platform.scala b/core/jvm/src/main/scala/cats/effect/Platform.scala new file mode 100644 index 0000000000..f0640d71d1 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/Platform.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +private object Platform { + final val isJs = false + final val isJvm = true + final val isNative = false +} diff --git a/core/native/src/main/scala/cats/effect/Platform.scala b/core/native/src/main/scala/cats/effect/Platform.scala new file mode 100644 index 0000000000..e9b3581f34 --- /dev/null +++ b/core/native/src/main/scala/cats/effect/Platform.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +private object Platform { + final val isJs = false + final val isJvm = false + final val isNative = true +} diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 120e74d00c..a9f051e8a6 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -1263,7 +1263,7 @@ private final class IOFiber[A]( } private[this] def rescheduleFiber(ec: ExecutionContext, fiber: IOFiber[_]): Unit = { - if (ec.isInstanceOf[WorkStealingThreadPool]) { + if (Platform.isJvm && ec.isInstanceOf[WorkStealingThreadPool]) { val wstp = ec.asInstanceOf[WorkStealingThreadPool] wstp.reschedule(fiber) } else { @@ -1272,9 +1272,12 @@ private final class IOFiber[A]( } private[this] def scheduleFiber(ec: ExecutionContext, fiber: IOFiber[_]): Unit = { - if (ec.isInstanceOf[WorkStealingThreadPool]) { + if (Platform.isJvm && ec.isInstanceOf[WorkStealingThreadPool]) { val wstp = ec.asInstanceOf[WorkStealingThreadPool] wstp.execute(fiber) + } else if (Platform.isJs && ec.isInstanceOf[BatchingMacrotaskExecutor]) { + val bmte = ec.asInstanceOf[BatchingMacrotaskExecutor] + bmte.schedule(fiber) } else { scheduleOnForeignEC(ec, fiber) } From 9f0c41803602023521f77d806d1a42cc4d9007a5 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 28 Nov 2022 07:07:12 +0000 Subject: [PATCH 03/27] Fix `BatchingMacrotaskExecutor` implementation --- .../effect/unsafe/BatchingMacrotaskExecutor.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index c7e8dcd8d7..2a029d023a 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -29,16 +29,21 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) private[this] var counter = 0 + private[this] val resetCounter: Runnable = () => counter = 0 + def reportFailure(t: Throwable): Unit = MacrotaskExecutor.reportFailure(t) def execute(runnable: Runnable): Unit = MacrotaskExecutor.execute(runnable) def schedule(runnable: Runnable): Unit = { - if (counter % batchSize == 0) - MacrotaskExecutor.execute(runnable) - else + if (counter < batchSize == 0) { MicrotaskExecutor.execute(runnable) + } else { + if (counter == batchSize) + MacrotaskExecutor.execute(resetCounter) + MacrotaskExecutor.execute(runnable) + } counter += 1 } From 6f777fea6bacc3dd7a192811beba0ea250553f06 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 28 Nov 2022 08:21:17 +0000 Subject: [PATCH 04/27] Wip test spec --- .../unsafe/BatchingMacrotaskExecutor.scala | 2 +- .../BatchingMacrotaskExecutorSpec.scala | 66 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 2a029d023a..bfb75a7bf3 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -37,7 +37,7 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) MacrotaskExecutor.execute(runnable) def schedule(runnable: Runnable): Unit = { - if (counter < batchSize == 0) { + if (counter < batchSize) { MicrotaskExecutor.execute(runnable) } else { if (counter == batchSize) diff --git a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala new file mode 100644 index 0000000000..eaa5e002af --- /dev/null +++ b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import cats.effect.std.CountDownLatch +import cats.syntax.all._ +import org.scalajs.macrotaskexecutor.MacrotaskExecutor + +import scala.concurrent.duration._ + +class BatchingMacrotaskExecutorSpec extends BaseSpec { + + "BatchingMacrotaskExecutor" should { + "batch fibers" in real { + CountDownLatch[IO](10).flatMap { latch => + IO.ref(List.empty[Int]).flatMap { ref => + List.range(0, 10).traverse_ { i => + val task = ref.update(_ :+ i) *> latch.release + val taskOnEc = if (i == 0) task.evalOn(MacrotaskExecutor) else task + taskOnEc.start + } *> + latch.await *> + ref.get.map(_ must beEqualTo(List.range(1, 10) :+ 0)) + } + } + } + + "cede to macrotasks" in real { + IO.ref(false) + .flatMap { ref => + ref.set(true).evalOn(MacrotaskExecutor).start *> + (ref.get, IO.cede, ref.get).tupled.start + } + .flatMap { f => + f.join.flatMap(_.embedNever).flatMap { + case (before, (), after) => + IO { + before must beFalse + after must beTrue + } + } + } + } + + // "limit batch sizes" in real { + // def go: IO[Unit] = IO.defer(go).both(IO.defer(go)).void + // go.timeoutTo(100.millis, IO.unit) *> IO(true must beTrue) + // } + } + +} From bfd876d3eacf4e37c1eaba2c46d6565a6f850e5b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Mon, 28 Nov 2022 19:51:33 +0000 Subject: [PATCH 05/27] Rearrange platform checks --- .../src/main/scala/cats/effect/IOFiber.scala | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index a9f051e8a6..50bd562f2b 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -1263,21 +1263,33 @@ private final class IOFiber[A]( } private[this] def rescheduleFiber(ec: ExecutionContext, fiber: IOFiber[_]): Unit = { - if (Platform.isJvm && ec.isInstanceOf[WorkStealingThreadPool]) { - val wstp = ec.asInstanceOf[WorkStealingThreadPool] - wstp.reschedule(fiber) + if (Platform.isJvm) { + if (ec.isInstanceOf[WorkStealingThreadPool]) { + val wstp = ec.asInstanceOf[WorkStealingThreadPool] + wstp.reschedule(fiber) + } else { + scheduleOnForeignEC(ec, fiber) + } } else { scheduleOnForeignEC(ec, fiber) } } private[this] def scheduleFiber(ec: ExecutionContext, fiber: IOFiber[_]): Unit = { - if (Platform.isJvm && ec.isInstanceOf[WorkStealingThreadPool]) { - val wstp = ec.asInstanceOf[WorkStealingThreadPool] - wstp.execute(fiber) - } else if (Platform.isJs && ec.isInstanceOf[BatchingMacrotaskExecutor]) { - val bmte = ec.asInstanceOf[BatchingMacrotaskExecutor] - bmte.schedule(fiber) + if (Platform.isJvm) { + if (ec.isInstanceOf[WorkStealingThreadPool]) { + val wstp = ec.asInstanceOf[WorkStealingThreadPool] + wstp.execute(fiber) + } else { + scheduleOnForeignEC(ec, fiber) + } + } else if (Platform.isJs) { + if (ec.isInstanceOf[BatchingMacrotaskExecutor]) { + val bmte = ec.asInstanceOf[BatchingMacrotaskExecutor] + bmte.schedule(fiber) + } else { + scheduleOnForeignEC(ec, fiber) + } } else { scheduleOnForeignEC(ec, fiber) } From c82ed914a578efe535b499d1e3d43c7d7c98d52c Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 05:55:44 +0000 Subject: [PATCH 06/27] Make batching ec fiber-aware --- .../unsafe/BatchingMacrotaskExecutor.scala | 37 +++++++++++++-- .../unsafe/FiberAwareExecutionContext.scala | 45 ------------------- .../cats/effect/unsafe/FiberMonitor.scala | 10 ++--- .../unsafe/IORuntimeCompanionPlatform.scala | 12 +---- 4 files changed, 41 insertions(+), 63 deletions(-) delete mode 100644 core/js/src/main/scala/cats/effect/unsafe/FiberAwareExecutionContext.scala diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index bfb75a7bf3..094034a676 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -17,9 +17,13 @@ package cats.effect package unsafe +import cats.effect.tracing.TracingConstants + import org.scalajs.macrotaskexecutor.MacrotaskExecutor +import scala.collection.mutable import scala.concurrent.ExecutionContextExecutor +import scala.scalajs.LinkingInfo import scala.scalajs.concurrent.QueueExecutionContext private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) @@ -34,17 +38,44 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) def reportFailure(t: Throwable): Unit = MacrotaskExecutor.reportFailure(t) def execute(runnable: Runnable): Unit = - MacrotaskExecutor.execute(runnable) + MacrotaskExecutor.execute(monitor(runnable)) def schedule(runnable: Runnable): Unit = { if (counter < batchSize) { - MicrotaskExecutor.execute(runnable) + MicrotaskExecutor.execute(monitor(runnable)) } else { if (counter == batchSize) MacrotaskExecutor.execute(resetCounter) - MacrotaskExecutor.execute(runnable) + MacrotaskExecutor.execute(monitor(runnable)) } counter += 1 } + def liveTraces(): Map[IOFiber[_], Trace] = + fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap + + @inline private[this] def monitor(runnable: Runnable): Runnable = + if (LinkingInfo.developmentMode) + if (fiberBag ne null) + runnable match { + case r: IOFiber[_] => + fiberBag += r + () => { + // We have to remove r _before_ running it, b/c it may be re-enqueued while running + // B/c JS is single-threaded, nobody can observe the bag while it is running anyway + fiberBag -= r + r.run() + } + case _ => runnable + } + else runnable + else + runnable + + private[this] val fiberBag = + if (LinkingInfo.developmentMode && TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable) + mutable.Set.empty[IOFiber[_]] + else + null + } diff --git a/core/js/src/main/scala/cats/effect/unsafe/FiberAwareExecutionContext.scala b/core/js/src/main/scala/cats/effect/unsafe/FiberAwareExecutionContext.scala deleted file mode 100644 index 5a1a7eace8..0000000000 --- a/core/js/src/main/scala/cats/effect/unsafe/FiberAwareExecutionContext.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2020-2022 Typelevel - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cats.effect -package unsafe - -import scala.collection.mutable -import scala.concurrent.ExecutionContext - -private final class FiberAwareExecutionContext(ec: ExecutionContext) extends ExecutionContext { - - def liveTraces(): Map[IOFiber[_], Trace] = - fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap - - private[this] val fiberBag = mutable.Set.empty[IOFiber[_]] - - def execute(runnable: Runnable): Unit = runnable match { - case r: IOFiber[_] => - fiberBag += r - ec execute { () => - // We have to remove r _before_ running it, b/c it may be re-enqueued while running - // B/c JS is single-threaded, nobody can observe the bag while it is running anyway - fiberBag -= r - r.run() - } - - case r => r.run() - } - - def reportFailure(cause: Throwable): Unit = ec.reportFailure(cause) - -} diff --git a/core/js/src/main/scala/cats/effect/unsafe/FiberMonitor.scala b/core/js/src/main/scala/cats/effect/unsafe/FiberMonitor.scala index 038fbe3550..28589daf36 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/FiberMonitor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/FiberMonitor.scala @@ -48,8 +48,8 @@ private[effect] sealed abstract class FiberMonitor extends FiberMonitorShared { */ private final class ES2021FiberMonitor( // A reference to the compute pool of the `IORuntime` in which this suspended fiber bag - // operates. `null` if the compute pool of the `IORuntime` is not a `FiberAwareExecutionContext`. - private[this] val compute: FiberAwareExecutionContext + // operates. `null` if the compute pool of the `IORuntime` is not a `BatchingMacrotaskExecutor`. + private[this] val compute: BatchingMacrotaskExecutor ) extends FiberMonitor { private[this] val bag = new WeakBag[IOFiber[_]]() @@ -102,9 +102,9 @@ private final class NoOpFiberMonitor extends FiberMonitor { private[effect] object FiberMonitor { def apply(compute: ExecutionContext): FiberMonitor = { if (LinkingInfo.developmentMode && weakRefsAvailable) { - if (compute.isInstanceOf[FiberAwareExecutionContext]) { - val faec = compute.asInstanceOf[FiberAwareExecutionContext] - new ES2021FiberMonitor(faec) + if (compute.isInstanceOf[BatchingMacrotaskExecutor]) { + val bmec = compute.asInstanceOf[BatchingMacrotaskExecutor] + new ES2021FiberMonitor(bmec) } else { new ES2021FiberMonitor(null) } diff --git a/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index 5a6c1b6f44..c1bb8b770d 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -16,20 +16,12 @@ package cats.effect.unsafe -import cats.effect.tracing.TracingConstants - import scala.concurrent.ExecutionContext -import scala.scalajs.LinkingInfo private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type => - def defaultComputeExecutionContext: ExecutionContext = { - val ec = new BatchingMacrotaskExecutor(64) - if (LinkingInfo.developmentMode && TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable) - new FiberAwareExecutionContext(ec) - else - ec - } + def defaultComputeExecutionContext: ExecutionContext = + new BatchingMacrotaskExecutor(64) def defaultScheduler: Scheduler = Scheduler.createDefaultScheduler()._1 From 9189ee1b09e316afc8c78cd47eb05c26fc11f31b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 07:08:18 +0000 Subject: [PATCH 07/27] Restore deque-based implementation --- build.sbt | 2 +- .../unsafe/BatchingMacrotaskExecutor.scala | 40 ++++++++++++++----- project/plugins.sbt | 2 +- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/build.sbt b/build.sbt index 6143c60d72..69b5c03b85 100644 --- a/build.sbt +++ b/build.sbt @@ -41,7 +41,7 @@ ThisBuild / git.gitUncommittedChanges := { } } -ThisBuild / tlBaseVersion := "3.4" +ThisBuild / tlBaseVersion := "3.5" ThisBuild / tlUntaggedAreSnapshots := false ThisBuild / organization := "org.typelevel" diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 094034a676..bcef40868d 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -25,32 +25,52 @@ import scala.collection.mutable import scala.concurrent.ExecutionContextExecutor import scala.scalajs.LinkingInfo import scala.scalajs.concurrent.QueueExecutionContext +import scala.util.control.NonFatal + +import java.util.ArrayDeque private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) extends ExecutionContextExecutor { private[this] val MicrotaskExecutor = QueueExecutionContext.promises() - private[this] var counter = 0 + private[this] var needsReschedule = true + private[this] val tasks = new ArrayDeque[Runnable](batchSize) - private[this] val resetCounter: Runnable = () => counter = 0 + private[this] def executeBatchTask = _executeBatchTask + private[this] val _executeBatchTask: Runnable = () => { + // do up to batchSize tasks + var i = 0 + while (i < batchSize && !tasks.isEmpty()) { + val runnable = tasks.poll() + try runnable.run() + catch { + case t if NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } + i += 1 + } - def reportFailure(t: Throwable): Unit = MacrotaskExecutor.reportFailure(t) + if (!tasks.isEmpty()) // we'll be right back after the (post) message + MacrotaskExecutor.execute(executeBatchTask) + else + needsReschedule = true + } def execute(runnable: Runnable): Unit = MacrotaskExecutor.execute(monitor(runnable)) def schedule(runnable: Runnable): Unit = { - if (counter < batchSize) { - MicrotaskExecutor.execute(monitor(runnable)) - } else { - if (counter == batchSize) - MacrotaskExecutor.execute(resetCounter) - MacrotaskExecutor.execute(monitor(runnable)) + tasks.addLast(runnable) + if (needsReschedule) { + needsReschedule = false + // run immediately after the current task suspends + MicrotaskExecutor.execute(runnable) } - counter += 1 } + def reportFailure(t: Throwable): Unit = MacrotaskExecutor.reportFailure(t) + def liveTraces(): Map[IOFiber[_], Trace] = fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap diff --git a/project/plugins.sbt b/project/plugins.sbt index 19da91a81f..14fcd1db97 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,7 +2,7 @@ libraryDependencies += "org.scala-js" %% "scalajs-env-selenium" % "1.1.1" addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.5.0-M5") -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.10.1") +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.12.0") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.2.0") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.7") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0") From e67900c0c003aa5d3fe3a8d2830adc96bb55fe56 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 07:15:58 +0000 Subject: [PATCH 08/27] Fix bugs --- .../scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index bcef40868d..483bd2e0c3 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -61,11 +61,11 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) MacrotaskExecutor.execute(monitor(runnable)) def schedule(runnable: Runnable): Unit = { - tasks.addLast(runnable) + tasks.addLast(monitor(runnable)) if (needsReschedule) { needsReschedule = false // run immediately after the current task suspends - MicrotaskExecutor.execute(runnable) + MicrotaskExecutor.execute(executeBatchTask) } } From baa1f6c7611ed704f1ddcf1d39ca050f71eb81f9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 07:36:58 +0000 Subject: [PATCH 09/27] Add comments to spec --- .../cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala index eaa5e002af..6b9da9583d 100644 --- a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala +++ b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration._ class BatchingMacrotaskExecutorSpec extends BaseSpec { "BatchingMacrotaskExecutor" should { - "batch fibers" in real { + "batch fibers" in real { // fails if running on MacrotaskExecutor CountDownLatch[IO](10).flatMap { latch => IO.ref(List.empty[Int]).flatMap { ref => List.range(0, 10).traverse_ { i => @@ -40,7 +40,7 @@ class BatchingMacrotaskExecutorSpec extends BaseSpec { } } - "cede to macrotasks" in real { + "cede to macrotasks" in real { // fails if running on Promises EC IO.ref(false) .flatMap { ref => ref.set(true).evalOn(MacrotaskExecutor).start *> From 5cdc0d1279eb39d8135fef86be91024c18fc5bc8 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 08:07:01 +0000 Subject: [PATCH 10/27] Fix "limit batch sizes" test --- .../unsafe/BatchingMacrotaskExecutorSpec.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala index 6b9da9583d..4489db78a9 100644 --- a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala +++ b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala @@ -57,10 +57,15 @@ class BatchingMacrotaskExecutorSpec extends BaseSpec { } } - // "limit batch sizes" in real { - // def go: IO[Unit] = IO.defer(go).both(IO.defer(go)).void - // go.timeoutTo(100.millis, IO.unit) *> IO(true must beTrue) - // } + "limit batch sizes" in real { + IO.ref(true).flatMap { continue => + def go: IO[Unit] = continue.get.flatMap { + IO.defer(go).both(IO.defer(go)).void.whenA(_) + } + val stop = IO.sleep(100.millis) *> continue.set(false) + go.both(stop) *> IO(true must beTrue) + } + } } } From 1aa3065ab54d0209d20aeb8497462482cdb6028a Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 08:07:49 +0000 Subject: [PATCH 11/27] Organize imports --- .../scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala index 4489db78a9..77b155d6be 100644 --- a/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala +++ b/tests/js/src/test/scala/cats/effect/unsafe/BatchingMacrotaskExecutorSpec.scala @@ -19,6 +19,7 @@ package unsafe import cats.effect.std.CountDownLatch import cats.syntax.all._ + import org.scalajs.macrotaskexecutor.MacrotaskExecutor import scala.concurrent.duration._ From a89d6e179be257a8153cef2960cf916d28768d36 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 08:37:54 +0000 Subject: [PATCH 12/27] Add MiMa filter --- build.sbt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index a9a53ab154..be401d7573 100644 --- a/build.sbt +++ b/build.sbt @@ -742,7 +742,10 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) ProblemFilters.exclude[Problem]("cats.effect.ArrayStack*"), // mystery filters that became required in 3.4.0 ProblemFilters.exclude[DirectMissingMethodProblem]( - "cats.effect.tracing.TracingConstants.*") + "cats.effect.tracing.TracingConstants.*"), + // introduced by #3225, which replaced it with the BatchingMacrotaskExecutor + ProblemFilters.exclude[MissingClassProblem]( + "cats.effect.unsafe.FiberAwareExecutionContext") ) }, mimaBinaryIssueFilters ++= { From 71059ac32dda2f0d1e84b1ae205055dde427b109 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 09:15:10 +0000 Subject: [PATCH 13/27] Comments and scaladocs --- .../unsafe/BatchingMacrotaskExecutor.scala | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 483bd2e0c3..ecc6f320be 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -29,11 +29,25 @@ import scala.util.control.NonFatal import java.util.ArrayDeque +/** + * An `ExecutionContext` that improves throughput by providing a method to `schedule` tasks to + * execute in batches, instead of one task per event loop iteration. This optimization targets + * the typical scenario where a UI or I/O event handler starts/resumes a small number of + * short-lived fibers and then yields to the event loop. + * + * This `ExecutionContext` also maintains a fiber bag in development mode to enable fiber dumps. + * + * @param batchSize + * the maximum number of batched runnables to execute before yielding to the event loop + */ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) extends ExecutionContextExecutor { private[this] val MicrotaskExecutor = QueueExecutionContext.promises() + /** + * Whether the `executeBatchTask` needs to be rescheduled + */ private[this] var needsReschedule = true private[this] val tasks = new ArrayDeque[Runnable](batchSize) @@ -51,20 +65,30 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) i += 1 } - if (!tasks.isEmpty()) // we'll be right back after the (post) message + if (!tasks.isEmpty()) // we'll be right back after this (post) message MacrotaskExecutor.execute(executeBatchTask) - else + else // this batch task will need to be rescheduled when more tasks arrive needsReschedule = true + + // yield to the event loop } + /** + * Execute the `runnable` in the next iteration of the event loop. + */ def execute(runnable: Runnable): Unit = MacrotaskExecutor.execute(monitor(runnable)) + /** + * Schedule the `runnable` for the next available batch. This is often the currently executing + * batch. + */ def schedule(runnable: Runnable): Unit = { tasks.addLast(monitor(runnable)) if (needsReschedule) { needsReschedule = false - // run immediately after the current task suspends + // start executing the batch immediately after the currently running task suspends + // this is safe b/c `needsReschedule` is set to `true` only upon yielding to the event loop MicrotaskExecutor.execute(executeBatchTask) } } @@ -82,7 +106,7 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) fiberBag += r () => { // We have to remove r _before_ running it, b/c it may be re-enqueued while running - // B/c JS is single-threaded, nobody can observe the bag while it is running anyway + // b/c JS is single-threaded, nobody can observe the bag while the fiber is running anyway fiberBag -= r r.run() } From c39ce00fc8adfb2f4e3c76c5a9c0d8a4c81d96ab Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 09:23:28 +0000 Subject: [PATCH 14/27] Another mima filter --- build.sbt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index be401d7573..d58c36a3ca 100644 --- a/build.sbt +++ b/build.sbt @@ -743,9 +743,11 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform) // mystery filters that became required in 3.4.0 ProblemFilters.exclude[DirectMissingMethodProblem]( "cats.effect.tracing.TracingConstants.*"), - // introduced by #3225, which replaced it with the BatchingMacrotaskExecutor + // introduced by #3225, which added the BatchingMacrotaskExecutor ProblemFilters.exclude[MissingClassProblem]( - "cats.effect.unsafe.FiberAwareExecutionContext") + "cats.effect.unsafe.FiberAwareExecutionContext"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "cats.effect.unsafe.ES2021FiberMonitor.this") ) }, mimaBinaryIssueFilters ++= { From 9af54691ab6cf3eaee00084e063d871c05aa3a81 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 09:29:25 +0000 Subject: [PATCH 15/27] More efficient fiber monitoring --- .../unsafe/BatchingMacrotaskExecutor.scala | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index ecc6f320be..d815b9ceea 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal import java.util.ArrayDeque /** - * An `ExecutionContext` that improves throughput by providing a method to `schedule` tasks to + * An `ExecutionContext` that improves throughput by providing a method to `schedule` fibers to * execute in batches, instead of one task per event loop iteration. This optimization targets * the typical scenario where a UI or I/O event handler starts/resumes a small number of * short-lived fibers and then yields to the event loop. @@ -49,23 +49,29 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) * Whether the `executeBatchTask` needs to be rescheduled */ private[this] var needsReschedule = true - private[this] val tasks = new ArrayDeque[Runnable](batchSize) + private[this] val fibers = new ArrayDeque[IOFiber[_]](batchSize) private[this] def executeBatchTask = _executeBatchTask private[this] val _executeBatchTask: Runnable = () => { // do up to batchSize tasks var i = 0 - while (i < batchSize && !tasks.isEmpty()) { - val runnable = tasks.poll() - try runnable.run() + while (i < batchSize && !fibers.isEmpty()) { + val fiber = fibers.poll() + + if (LinkingInfo.developmentMode) + if (fiberBag ne null) + fiberBag -= fiber + + try fiber.run() catch { case t if NonFatal(t) => reportFailure(t) case t: Throwable => IOFiber.onFatalFailure(t) } + i += 1 } - if (!tasks.isEmpty()) // we'll be right back after this (post) message + if (!fibers.isEmpty()) // we'll be right back after this (post) message MacrotaskExecutor.execute(executeBatchTask) else // this batch task will need to be rescheduled when more tasks arrive needsReschedule = true @@ -80,11 +86,16 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) MacrotaskExecutor.execute(monitor(runnable)) /** - * Schedule the `runnable` for the next available batch. This is often the currently executing + * Schedule the `fiber` for the next available batch. This is often the currently executing * batch. */ - def schedule(runnable: Runnable): Unit = { - tasks.addLast(monitor(runnable)) + def schedule(fiber: IOFiber[_]): Unit = { + if (LinkingInfo.developmentMode) + if (fiberBag ne null) + fiberBag += fiber + + fibers.addLast(fiber) + if (needsReschedule) { needsReschedule = false // start executing the batch immediately after the currently running task suspends From 88faeeda9a1c4b9a46e18a3d8c9611e9587a5e99 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 4 Dec 2022 09:46:37 +0000 Subject: [PATCH 16/27] Update comment --- .../scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index d815b9ceea..04b92ffc47 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -73,7 +73,7 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) if (!fibers.isEmpty()) // we'll be right back after this (post) message MacrotaskExecutor.execute(executeBatchTask) - else // this batch task will need to be rescheduled when more tasks arrive + else // the batch task will need to be rescheduled when more fibers arrive needsReschedule = true // yield to the event loop From b414390bccbada6afd006cfaa7032c4204c03c2b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 06:00:13 +0000 Subject: [PATCH 17/27] Make `executeBatchTask` an `object` --- .../unsafe/BatchingMacrotaskExecutor.scala | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 04b92ffc47..ce8a152fd2 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -51,32 +51,33 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) private[this] var needsReschedule = true private[this] val fibers = new ArrayDeque[IOFiber[_]](batchSize) - private[this] def executeBatchTask = _executeBatchTask - private[this] val _executeBatchTask: Runnable = () => { - // do up to batchSize tasks - var i = 0 - while (i < batchSize && !fibers.isEmpty()) { - val fiber = fibers.poll() - - if (LinkingInfo.developmentMode) - if (fiberBag ne null) - fiberBag -= fiber - - try fiber.run() - catch { - case t if NonFatal(t) => reportFailure(t) - case t: Throwable => IOFiber.onFatalFailure(t) - } + private[this] object executeBatchTask extends Runnable { + def run() = { + // do up to batchSize tasks + var i = 0 + while (i < batchSize && !fibers.isEmpty()) { + val fiber = fibers.poll() + + if (LinkingInfo.developmentMode) + if (fiberBag ne null) + fiberBag -= fiber + + try fiber.run() + catch { + case t if NonFatal(t) => reportFailure(t) + case t: Throwable => IOFiber.onFatalFailure(t) + } - i += 1 - } + i += 1 + } - if (!fibers.isEmpty()) // we'll be right back after this (post) message - MacrotaskExecutor.execute(executeBatchTask) - else // the batch task will need to be rescheduled when more fibers arrive - needsReschedule = true + if (!fibers.isEmpty()) // we'll be right back after this (post) message + MacrotaskExecutor.execute(this) + else // the batch task will need to be rescheduled when more fibers arrive + needsReschedule = true - // yield to the event loop + // yield to the event loop + } } /** From f770758b614fc20e35f61ddbfca733fd5996a189 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 06:01:07 +0000 Subject: [PATCH 18/27] Make link-time condition elidable --- .../cats/effect/unsafe/BatchingMacrotaskExecutor.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index ce8a152fd2..1a3bfa19c5 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -129,8 +129,11 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) runnable private[this] val fiberBag = - if (LinkingInfo.developmentMode && TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable) - mutable.Set.empty[IOFiber[_]] + if (LinkingInfo.developmentMode) + if (TracingConstants.isStackTracing && FiberMonitor.weakRefsAvailable) + mutable.Set.empty[IOFiber[_]] + else + null else null From a7900ba6f0998b036ebd41e96c6ab27ac5681ea7 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 06:23:38 +0000 Subject: [PATCH 19/27] Expose config for `BatchingMacrotaskExecutor` --- .../js/src/main/scala/cats/effect/IOApp.scala | 21 +++++++++++++------ .../unsafe/BatchingMacrotaskExecutor.scala | 8 ++++--- .../unsafe/IORuntimeCompanionPlatform.scala | 8 ++++++- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/IOApp.scala b/core/js/src/main/scala/cats/effect/IOApp.scala index 204f43a534..6541f81212 100644 --- a/core/js/src/main/scala/cats/effect/IOApp.scala +++ b/core/js/src/main/scala/cats/effect/IOApp.scala @@ -17,6 +17,7 @@ package cats.effect import cats.effect.metrics.JsCpuStarvationMetrics +import cats.effect.std.Console import cats.effect.tracing.TracingConstants._ import scala.concurrent.CancellationException @@ -161,6 +162,16 @@ trait IOApp { */ protected def runtimeConfig: unsafe.IORuntimeConfig = unsafe.IORuntimeConfig() + /** + * Configures the action to perform when unhandled errors are caught by the runtime. By + * default, this simply delegates to [[cats.effect.std.Console!.printStackTrace]]. It is safe + * to perform any `IO` action within this handler; it will not block the progress of the + * runtime. With that said, some care should be taken to avoid raising unhandled errors as a + * result of handling unhandled errors, since that will result in the obvious chaos. + */ + protected def reportFailure(err: Throwable): IO[Unit] = + Console[IO].printStackTrace(err) + /** * The entry point for your application. Will be called by the runtime when the process is * started. If the underlying runtime supports it, any arguments passed to the process will be @@ -182,12 +193,10 @@ trait IOApp { import unsafe.IORuntime val installed = IORuntime installGlobal { - IORuntime( - IORuntime.defaultComputeExecutionContext, - IORuntime.defaultComputeExecutionContext, - IORuntime.defaultScheduler, - () => (), - runtimeConfig) + val compute = IORuntime.createBatchingMacrotaskExecutor(reportFailure = t => + reportFailure(t).unsafeRunAndForgetWithoutCallback()(runtime)) + + IORuntime(compute, compute, IORuntime.defaultScheduler, () => (), runtimeConfig) } _runtime = IORuntime.global diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 1a3bfa19c5..9159fdab71 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -40,8 +40,10 @@ import java.util.ArrayDeque * @param batchSize * the maximum number of batched runnables to execute before yielding to the event loop */ -private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) - extends ExecutionContextExecutor { +private[effect] final class BatchingMacrotaskExecutor( + batchSize: Int, + reportFailure0: Throwable => Unit +) extends ExecutionContextExecutor { private[this] val MicrotaskExecutor = QueueExecutionContext.promises() @@ -105,7 +107,7 @@ private[effect] final class BatchingMacrotaskExecutor(batchSize: Int) } } - def reportFailure(t: Throwable): Unit = MacrotaskExecutor.reportFailure(t) + def reportFailure(t: Throwable): Unit = reportFailure0(t) def liveTraces(): Map[IOFiber[_], Trace] = fiberBag.iterator.filterNot(_.isDone).map(f => f -> f.captureTrace()).toMap diff --git a/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala b/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala index c1bb8b770d..41fdc2fe70 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/IORuntimeCompanionPlatform.scala @@ -21,7 +21,13 @@ import scala.concurrent.ExecutionContext private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type => def defaultComputeExecutionContext: ExecutionContext = - new BatchingMacrotaskExecutor(64) + createBatchingMacrotaskExecutor() + + def createBatchingMacrotaskExecutor( + batchSize: Int = 64, + reportFailure: Throwable => Unit = _.printStackTrace() + ): ExecutionContext = + new BatchingMacrotaskExecutor(batchSize, reportFailure) def defaultScheduler: Scheduler = Scheduler.createDefaultScheduler()._1 From ca07e692afbe89f14f6aea132c47666618a8c19b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 18:58:16 +0000 Subject: [PATCH 20/27] First try at `JSArrayQueue` --- .../cats/effect/unsafe/JSArrayQueue.scala | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala diff --git a/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala b/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala new file mode 100644 index 0000000000..deeb8877f4 --- /dev/null +++ b/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala @@ -0,0 +1,72 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.unsafe + +import scala.scalajs.js + +/** + * A JS-Array backed circular buffer FIFO queue. It is careful to grow the buffer only using + * `push` to avoid creating "holes" on V8 (this is a known shortcoming of the Scala.js + * `j.u.ArrayDeque` implementation). + */ +private final class JSArrayQueue[A] { + + private[this] val buffer = js.Array[A](null.asInstanceOf[A]) + + private[this] var startIndex: Int = 0 + private[this] var endIndex: Int = 1 + private[this] var empty: Boolean = true + + @inline def isEmpty: Boolean = empty + + @inline def take(): A = { + val a = buffer(startIndex) + buffer(startIndex) = null.asInstanceOf[A] + startIndex += 1 + if (startIndex == endIndex) + empty = true + if (startIndex >= buffer.length) + startIndex = 0 + a + } + + @inline def offer(a: A): Unit = { + growIfNeeded() + endIndex += 1 + if (endIndex > buffer.length) + endIndex = 1 + buffer(endIndex - 1) = a + empty = false + } + + @inline private[this] def growIfNeeded(): Unit = + if (!empty) { // empty queue always has capacity >= 1 + if (startIndex == 0 && endIndex == buffer.length) { + buffer.push(null.asInstanceOf[A]) + () + } else if (startIndex == endIndex) { + var i = 0 + while (i < endIndex) { + buffer.push(buffer(i)) + buffer(i) = null.asInstanceOf[A] + i += 1 + } + endIndex *= 2 + } + } + +} From db9a8cdf898958294e549484d3fd946e72363bfa Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 19:05:57 +0000 Subject: [PATCH 21/27] Add `JSArrayQueueSpec` --- .../cats/effect/unsafe/JSArrayQueueSpec.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala diff --git a/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala b/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala new file mode 100644 index 0000000000..45625453ac --- /dev/null +++ b/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2020-2022 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect +package unsafe + +import org.scalacheck.Prop.forAll +import org.specs2.ScalaCheck + +import scala.collection.mutable.ListBuffer + +class JSArrayQueueSpec extends BaseSpec with ScalaCheck { + + "JSArrayQueue" should { + "be fifo" in { + forAll { (stuff: List[Option[Int]]) => + val queue = new JSArrayQueue[Int] + val taken = new ListBuffer[Int] + + stuff.foreach { + case Some(i) => queue.offer(i) + case None => + if (!queue.isEmpty) taken += queue.take() + } + + while (!queue.isEmpty) taken += queue.take() + + taken.toList must beEqualTo(stuff.flatten) + } + } + } + +} From b09990ec05f1d535f32a65883268a7de9d1d4ede Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 19:08:07 +0000 Subject: [PATCH 22/27] `isEmpty()` is side-effectful --- core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala | 2 +- .../src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala b/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala index deeb8877f4..f8544572bb 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala @@ -31,7 +31,7 @@ private final class JSArrayQueue[A] { private[this] var endIndex: Int = 1 private[this] var empty: Boolean = true - @inline def isEmpty: Boolean = empty + @inline def isEmpty(): Boolean = empty @inline def take(): A = { val a = buffer(startIndex) diff --git a/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala b/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala index 45625453ac..23538314d9 100644 --- a/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala +++ b/tests/js/src/test/scala/cats/effect/unsafe/JSArrayQueueSpec.scala @@ -33,10 +33,10 @@ class JSArrayQueueSpec extends BaseSpec with ScalaCheck { stuff.foreach { case Some(i) => queue.offer(i) case None => - if (!queue.isEmpty) taken += queue.take() + if (!queue.isEmpty()) taken += queue.take() } - while (!queue.isEmpty) taken += queue.take() + while (!queue.isEmpty()) taken += queue.take() taken.toList must beEqualTo(stuff.flatten) } From 7ca1b9c46dc6623a9d971a9a0c46854ebdee6e08 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 19:46:03 +0000 Subject: [PATCH 23/27] Bug fix --- core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala b/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala index f8544572bb..14f1764f66 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/JSArrayQueue.scala @@ -65,7 +65,7 @@ private final class JSArrayQueue[A] { buffer(i) = null.asInstanceOf[A] i += 1 } - endIndex *= 2 + endIndex = buffer.length } } From f11e28f3808b70f5574dd542ac18f3d157bf9f9f Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 21 Jan 2023 19:48:14 +0000 Subject: [PATCH 24/27] Use `JSArrayQueue` in batching EC --- .../cats/effect/unsafe/BatchingMacrotaskExecutor.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index 9159fdab71..e0861280fc 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -27,8 +27,6 @@ import scala.scalajs.LinkingInfo import scala.scalajs.concurrent.QueueExecutionContext import scala.util.control.NonFatal -import java.util.ArrayDeque - /** * An `ExecutionContext` that improves throughput by providing a method to `schedule` fibers to * execute in batches, instead of one task per event loop iteration. This optimization targets @@ -51,14 +49,14 @@ private[effect] final class BatchingMacrotaskExecutor( * Whether the `executeBatchTask` needs to be rescheduled */ private[this] var needsReschedule = true - private[this] val fibers = new ArrayDeque[IOFiber[_]](batchSize) + private[this] val fibers = new JSArrayQueue[IOFiber[_]] private[this] object executeBatchTask extends Runnable { def run() = { // do up to batchSize tasks var i = 0 while (i < batchSize && !fibers.isEmpty()) { - val fiber = fibers.poll() + val fiber = fibers.take() if (LinkingInfo.developmentMode) if (fiberBag ne null) @@ -97,7 +95,7 @@ private[effect] final class BatchingMacrotaskExecutor( if (fiberBag ne null) fiberBag += fiber - fibers.addLast(fiber) + fibers.offer(fiber) if (needsReschedule) { needsReschedule = false From 4cf6972ecb4edeb1673d4e6539cdea3ae4f5087b Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 22 Jan 2023 00:30:45 +0000 Subject: [PATCH 25/27] Directly use `queueMicrotask` --- .../effect/unsafe/BatchingMacrotaskExecutor.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index e0861280fc..fe4ebe992e 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -23,8 +23,7 @@ import org.scalajs.macrotaskexecutor.MacrotaskExecutor import scala.collection.mutable import scala.concurrent.ExecutionContextExecutor -import scala.scalajs.LinkingInfo -import scala.scalajs.concurrent.QueueExecutionContext +import scala.scalajs.{js, LinkingInfo} import scala.util.control.NonFatal /** @@ -43,7 +42,13 @@ private[effect] final class BatchingMacrotaskExecutor( reportFailure0: Throwable => Unit ) extends ExecutionContextExecutor { - private[this] val MicrotaskExecutor = QueueExecutionContext.promises() + private[this] val queueMicrotask: js.Function1[js.Function0[Any], Any] = + if (js.typeOf(js.Dynamic.global.queueMicrotask) == "function") + js.Dynamic.global.queueMicrotask.asInstanceOf[js.Function1[js.Function0[Any], Any]] + else { + val resolved = js.Dynamic.global.Promise.resolved(()) + task => resolved.`then`(task) + } /** * Whether the `executeBatchTask` needs to be rescheduled @@ -101,7 +106,8 @@ private[effect] final class BatchingMacrotaskExecutor( needsReschedule = false // start executing the batch immediately after the currently running task suspends // this is safe b/c `needsReschedule` is set to `true` only upon yielding to the event loop - MicrotaskExecutor.execute(executeBatchTask) + queueMicrotask(() => executeBatchTask.run()) + () } } From 51ca39e107912976b32e01c9909b0b9b902976c1 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 22 Jan 2023 00:31:43 +0000 Subject: [PATCH 26/27] Rollback Scala.js bump --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index f4a0b44928..6d8b7703ec 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,7 +2,7 @@ libraryDependencies += "org.scala-js" %% "scalajs-env-selenium" % "1.1.1" addSbtPlugin("org.typelevel" % "sbt-typelevel" % "0.5.0-M9") -addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.12.0") +addSbtPlugin("org.scala-js" % "sbt-scalajs" % "1.11.0") addSbtPlugin("org.portable-scala" % "sbt-scalajs-crossproject" % "1.2.0") addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.4.9") addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0") From 12afda70f6106c963b993b5b648960bf919a5a18 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sat, 28 Jan 2023 19:43:30 +0000 Subject: [PATCH 27/27] Cache `executeBatchTaskJSFunction` --- .../cats/effect/unsafe/BatchingMacrotaskExecutor.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala index fe4ebe992e..4b41fc6f5b 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/BatchingMacrotaskExecutor.scala @@ -56,7 +56,7 @@ private[effect] final class BatchingMacrotaskExecutor( private[this] var needsReschedule = true private[this] val fibers = new JSArrayQueue[IOFiber[_]] - private[this] object executeBatchTask extends Runnable { + private[this] object executeBatchTaskRunnable extends Runnable { def run() = { // do up to batchSize tasks var i = 0 @@ -85,6 +85,9 @@ private[effect] final class BatchingMacrotaskExecutor( } } + private[this] val executeBatchTaskJSFunction: js.Function0[Any] = + () => executeBatchTaskRunnable.run() + /** * Execute the `runnable` in the next iteration of the event loop. */ @@ -106,7 +109,7 @@ private[effect] final class BatchingMacrotaskExecutor( needsReschedule = false // start executing the batch immediately after the currently running task suspends // this is safe b/c `needsReschedule` is set to `true` only upon yielding to the event loop - queueMicrotask(() => executeBatchTask.run()) + queueMicrotask(executeBatchTaskJSFunction) () } }