From e5d766bbf19490667132b075f408eba58938f3d2 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sun, 19 Jan 2025 11:30:49 +0800 Subject: [PATCH] chore: Proper test virtual thread dispatcher and update doc. (#1728) --- .../dispatch/VirtualThreadPoolDispatcherSpec.scala | 12 +++++++----- .../apache/pekko/dispatch/AbstractDispatcher.scala | 4 ++-- .../apache/pekko/dispatch/VirtualThreadSupport.scala | 1 + docs/src/main/paradox/dispatchers.md | 2 ++ docs/src/main/paradox/typed/dispatchers.md | 2 ++ project/PekkoBuild.scala | 1 + 6 files changed, 15 insertions(+), 7 deletions(-) diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index 739bf26a5e3..3050d70db5d 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -34,7 +34,7 @@ object VirtualThreadPoolDispatcherSpec { override def receive = { case "ping" => - sender() ! "All fine" + sender() ! Thread.currentThread().getName } } @@ -43,14 +43,16 @@ object VirtualThreadPoolDispatcherSpec { class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender { import VirtualThreadPoolDispatcherSpec._ - val Iterations = 1000 - "VirtualThreadPool support" must { "handle simple dispatch" in { val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher")) - innocentActor ! "ping" - expectMsg("All fine") + for (_ <- 1 to 1000) { + innocentActor ! "ping" + expectMsgPF() { case name: String => + name should include("VirtualThreadPoolDispatcherSpec-virtual-thread-dispatcher-virtual-thread-") + } + } } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index 40c7b4a772c..bb9b301eb32 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -417,7 +417,7 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis val tf: ThreadFactory = threadFactory match { case MonitorableThreadFactory(name, _, contextClassLoader, exceptionHandler, _) => new ThreadFactory { - private val vtFactory = newVirtualThreadFactory(name) + private val vtFactory = newVirtualThreadFactory(name + "-" + id) override def newThread(r: Runnable): Thread = { val vt = vtFactory.newThread(r) @@ -426,7 +426,7 @@ final class VirtualThreadExecutorConfigurator(config: Config, prerequisites: Dis vt } } - case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name); + case _ => VirtualThreadSupport.newVirtualThreadFactory(prerequisites.settings.name + "-" + id); } new ExecutorServiceFactory { import VirtualThreadSupport._ diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala index 3a777891559..484e600fc35 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -68,6 +68,7 @@ private[dispatch] object VirtualThreadSupport { newThreadPerTaskExecutorMethod.invoke(threadFactory).asInstanceOf[ExecutorService] } catch { case NonFatal(e) => + // --add-opens java.base/java.lang=ALL-UNNAMED throw new UnsupportedOperationException("Failed to create newThreadPerTaskExecutor.", e) } } diff --git a/docs/src/main/paradox/dispatchers.md b/docs/src/main/paradox/dispatchers.md index 5e804803967..cac3d0f8f3d 100644 --- a/docs/src/main/paradox/dispatchers.md +++ b/docs/src/main/paradox/dispatchers.md @@ -42,6 +42,8 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu threads the pool keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. + @@@ Another example that uses the "thread-pool-executor": diff --git a/docs/src/main/paradox/typed/dispatchers.md b/docs/src/main/paradox/typed/dispatchers.md index 2347dc73001..b3546a4416f 100644 --- a/docs/src/main/paradox/typed/dispatchers.md +++ b/docs/src/main/paradox/typed/dispatchers.md @@ -127,6 +127,8 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu threads the pool will keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. + @@@ @@@ note diff --git a/project/PekkoBuild.scala b/project/PekkoBuild.scala index 03052f9c6f5..b18682d6df0 100644 --- a/project/PekkoBuild.scala +++ b/project/PekkoBuild.scala @@ -290,6 +290,7 @@ object PekkoBuild { UsefulTask("testQuick", "Runs all the tests. When run multiple times will only run previously failing tests (shell mode only)"), UsefulTask("testOnly *.AnySpec", "Only run a selected test"), + UsefulTask("TestJdk9 / testOnly *.AnySpec", "Only run a Jdk9+ selected test"), UsefulTask("testQuick *.AnySpec", "Only run a selected test. When run multiple times will only run previously failing tests (shell mode only)"), UsefulTask("testQuickUntilPassed", "Runs all tests in a continuous loop until all tests pass"),