Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplatform support #21

Merged
merged 16 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,57 @@ on:
- main
pull_request:
jobs:
test:
test-jvm:
strategy:
fail-fast: false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
submodules: recursive
fetch-depth: 0
- uses: coursier/cache-action@v6
- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 21
- name: Get submodule status hash
id: get-submodules
run: echo "submodules=$(git submodule status | sha256sum | awk '{print $1}')" >> $GITHUB_OUTPUT
- uses: actions/cache@v3
with:
path: |
./dependencies/**/target
key: ${{ runner.os }}-${{ steps.get-submodules.outputs.submodules }}-scala-native-only
- name: Build dependencies
run: ./dependencies/publish-deps.sh --scala-native-only
- name: Test
run: sbt rootJVM/test
test-native:
strategy:
fail-fast: false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
fetch-depth: 0
- uses: coursier/cache-action@v6
- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 19
- name: Get submodule status hash
id: get-submodules
run: echo "submodules=$(git submodule status | sha256sum | awk '{print $1}')" >> $GITHUB_OUTPUT
- uses: actions/cache@v3
with:
path: |
./dependencies/**/target
key: ${{ runner.os }}-${{ steps.get-submodules.outputs.submodules }}
- name: Build dependencies
run: ./dependencies/publish-deps.sh
- name: Install scala-native dependencies
run: sudo apt-get install clang libstdc++-12-dev libgc-dev
- name: Test
run: sbt test
run: sbt rootNative/test
7 changes: 7 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[submodule "dependencies/scala-native"]
path = dependencies/scala-native
url = git@github.com:scala-native/scala-native.git
[submodule "dependencies/munit"]
path = dependencies/munit
url = git@github.com:natsukagami/munit.git
branch = use-0.5-snapshot-sn
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Contributing

## Building `gears`

`gears` currently require:
- **On the JVM**: JVM with support for virtual threads. This usually means JVM 21+, or 19+ with `--enable-preview`.
- **On Scala Native**: Scala Native with delimited continuations support. See the pinned versions in [`dependencies`](./dependencies/README.md).
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@



This is a proof of concept for a base library for asynchronous computing in direct style. The library needs either fibers or virtual threads as a basis. It is at present highly experimental, incomplete and provisional. It is not yet extensively tested and not optimized at all.

The concepts and code here should be regarded as a strawman, in the sense of "meant to be knocked down".
Expand Down
24 changes: 20 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType}
import scalanative.build._

ThisBuild / scalaVersion := "3.3.1"

lazy val root = project
lazy val root =
crossProject(JVMPlatform, NativePlatform)
.crossType(CrossType.Full)
.in(file("."))
.settings(
.settings(Seq(
name := "Gears",
organization := "ch.epfl.lamp",
version := "0.1.0-SNAPSHOT",
libraryDependencies += "org.scalameta" %% "munit" % "0.7.29" % Test
)
testFrameworks += new TestFramework("munit.Framework")
))
.jvmSettings(Seq(
javaOptions += "--version 21",
libraryDependencies += "org.scalameta" %% "munit" % "1.0.0-M10" % Test
))
.nativeSettings(Seq(
nativeConfig ~= { c =>
c.withMultithreadingSupport(true)
.withGC(GC.boehm) // immix doesn't work yet
},
libraryDependencies += "org.scalameta" %%% "munit" % "1.0.0-M10+15-3940023e-SNAPSHOT" % Test
))
22 changes: 22 additions & 0 deletions dependencies/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Custom Dependencies for Scala Native

Scala Native requires some libraries to be compiled from source and `publishLocal`'d.

### TL; DR

```bash
./publish-deps.sh
```

### What are included?

- The current snapshot version of Scala Native, pinned in `scala-native`: for the delimited continuation support.
This needs to be published for both `3.3.1` (for `gears`) and `3.1.2` (for `munit`):
```bash
sbt "publish-local-dev 3; ++3.1.2 publishLocal"
```
- A fork of `munit` that uses the above snapshot, with a simple fix (https://github.com/scalameta/munit/pull/714) to make it compile.
Pinned in `munit`.
```bash
sbt "munitNative/publishLocal"
```
1 change: 1 addition & 0 deletions dependencies/munit
Submodule munit added at 394002
13 changes: 13 additions & 0 deletions dependencies/publish-deps.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

set -e

cd "$(dirname "${BASH_SOURCE[0]}")" # change to this directory

cd scala-native && sbt 'publish-local-dev 3' && cd ..

if test "$1" != "--scala-native-only"; then
cd scala-native && sbt '++3.1.2 publishLocal' && cd ..
cd munit && sbt "++3.1.2 munitNative/publishLocal" && cd ..
fi

1 change: 1 addition & 0 deletions dependencies/scala-native
Submodule scala-native added at f4b907
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO

import gears.async.{Async, Future, given}
import gears.async.{Async, Future}
import gears.async.default.given
import Future.Promise

import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, ServerSocket, Socket}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO.examples

import gears.async.{Async, Future, given}
import gears.async.default.given
import gears.async.{Async, Future}
import gears.async.AsyncOperations.*
import PosixLikeIO.{PIOHelper, SocketUDP}

Expand All @@ -9,7 +10,6 @@ import java.nio.ByteBuffer
import java.nio.file.StandardOpenOption
import scala.concurrent.ExecutionContext


@main def clientAndServerUDP(): Unit =
given ExecutionContext = ExecutionContext.global
Async.blocking:
Expand All @@ -30,6 +30,5 @@ import scala.concurrent.ExecutionContext
val messageReceived = String(responseDatagram.getData.slice(0, responseDatagram.getLength), "UTF-8").toInt
println("Sent " + value.toString + " and got " + messageReceived.toString + " in return.")


Async.await(client(100))
Async.await(server)
Async.await(server)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO.examples

import gears.async.{Async, given}
import gears.async.Async
import gears.async.default.given
import PosixLikeIO.PIOHelper

import java.nio.ByteBuffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO.examples

import gears.async.{Async, given}
import gears.async.Async
import gears.async.default.given
import PosixLikeIO.PIOHelper

import java.nio.ByteBuffer
Expand Down
7 changes: 7 additions & 0 deletions jvm/src/main/scala/async/DefaultSupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package gears.async.default

import gears.async._

given AsyncOperations = JvmAsyncOperations
given VThreadSupport.type = VThreadSupport
given VThreadSupport.Scheduler = VThreadScheduler
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package gears.async

given JvmAsyncOperations.type = JvmAsyncOperations

object JvmAsyncOperations extends AsyncOperations:

private def jvmInterruptible[T](fn: => T)(using Async): T =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import scala.annotation.unchecked.uncheckedVariance
import java.util.concurrent.locks.ReentrantLock
import scala.concurrent.duration.FiniteDuration

given VThreadScheduler.type = VThreadScheduler
given VThreadSupport.type = VThreadSupport

object VThreadScheduler extends Scheduler:
override def execute(body: Runnable): Unit = Thread.startVirtualThread(body)

override def schedule(delay: FiniteDuration, body: Runnable): Cancellable =
val th = Thread.startVirtualThread: () =>
Thread.sleep(delay.toMillis)
body.run()
() => th.interrupt() // TODO this may interrupt the body after sleeping
execute(body)
() => th.interrupt()

object VThreadSupport extends AsyncSupport:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package measurements

import PosixLikeIO.PIOHelper
import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel, given}
import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel}
import gears.async.default.given

import java.io.{FileReader, FileWriter}
import java.nio.file.{Files, NoSuchFileException, Paths, StandardOpenOption}
Expand Down
21 changes: 21 additions & 0 deletions jvm/src/test/scala/CancellationBehavior.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import gears.async.{Async, Future, AsyncSupport, uninterruptible}
import gears.async.AsyncOperations.*
import gears.async.default.given
import scala.util.boundary
import boundary.break
import scala.concurrent.duration.{Duration, DurationInt}
import java.util.concurrent.CancellationException
import scala.util.Success
import scala.util.Properties

// JVM-only since `munitTimeout` is not available on scala native.
// See (here)[https://scalameta.org/munit/docs/tests.html#customize-test-timeouts].
class JVMCancellationBehavior extends munit.FunSuite:
override def munitTimeout: Duration = 2.seconds
test("no cancel -> timeout".fail):
Async.blocking:
val f = Future:
Thread.sleep(5000)
1
f.result

10 changes: 10 additions & 0 deletions native/src/main/scala/async/DefaultSupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package gears.async.default

import gears.async._
import gears.async.native.ForkJoinSupport

object DefaultSupport extends ForkJoinSupport

given AsyncSupport = DefaultSupport
given DefaultSupport.Scheduler = DefaultSupport
given AsyncOperations = DefaultSupport
91 changes: 91 additions & 0 deletions native/src/main/scala/async/ForkJoinSupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package gears.async.native

import gears.async._
import scala.scalanative.runtime.{Continuations => native}
import java.util.concurrent.ForkJoinPool
import scala.concurrent.ExecutionContext
import scala.concurrent.JavaConversions._
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicBoolean
import gears.async.Future.Promise
import java.util.concurrent.CancellationException

class NativeContinuation[-T, +R] private[native] (val cont: T => R) extends Suspension[T, R]:
def resume(arg: T): R = cont(arg)

trait NativeSuspend extends SuspendSupport:
type Label[R] = native.BoundaryLabel[R]
type Suspension[T, R] = NativeContinuation[T, R]

override def boundary[R](body: (Label[R]) ?=> R): R =
native.boundary(body)

override def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T = native.suspend[T, R](f => body(NativeContinuation(f)))
end NativeSuspend

/** Spawns a single thread that does all the sleeping. */
class ExecutorWithSleepThread(val exec: ExecutionContext) extends ExecutionContext with Scheduler {
import scala.collection.mutable
private case class Sleeper(wakeTime: Deadline, toRun: Runnable):
var isCancelled = false
private given Ordering[Sleeper] = Ordering.by((sleeper: Sleeper) => sleeper.wakeTime).reverse
private val sleepers = mutable.PriorityQueue.empty[Sleeper]
private var sleepingUntil: Option[Deadline] = None

override def execute(body: Runnable): Unit = exec.execute(body)
override def reportFailure(cause: Throwable): Unit = exec.reportFailure(cause)
override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = {
val sleeper = Sleeper(delay.fromNow, body)
// push to the sleeping priority queue
this.synchronized {
sleepers += sleeper
val shouldWake = sleepingUntil.map(sleeper.wakeTime < _).getOrElse(true)
if shouldWake then this.notifyAll()
}
() => { sleeper.isCancelled = true }
}

// Sleep until the first timer is due, or .wait() otherwise
private def sleepLoop(): Unit = this.synchronized {
while (true) {
sleepingUntil = sleepers.headOption.map(_.wakeTime)
val current = sleepingUntil match
case None =>
this.wait()
Deadline.now
case Some(value) =>
val current0 = Deadline.now
val timeLeft = value - current0

if timeLeft.toNanos > 0 then
this.wait(timeLeft.toMillis.max(10))
Deadline.now
else current0

// Pop sleepers until no more available
while (sleepers.headOption.exists(_.wakeTime <= current)) {
val task = sleepers.dequeue()
if !task.isCancelled then execute(task.toRun)
}
}
}

val sleeperThread = Thread(() => sleepLoop())
sleeperThread.setDaemon(true)
sleeperThread.start()
}

class SuspendExecutorWithSleep(exec: ExecutionContext) extends ExecutorWithSleepThread(exec)
with AsyncSupport
with AsyncOperations
with NativeSuspend {
type Scheduler = this.type
override def sleep(millis: Long)(using Async): Unit =
Future.withResolver[Unit]: resolver =>
val cancellable = schedule(millis.millis, () => resolver.resolve(()))
resolver.onCancel(cancellable.cancel)
.link()
.value
}

class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool())
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.0-SNAPSHOT")
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ trait AsyncOperations:
def sleep(millis: Long)(using Async): Unit

object AsyncOperations:
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
summon[AsyncOperations].sleep(millis)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait Cancellable:
/** Add this cancellable to the given group after removing
* it from the previous group in which it was.
*/
def link(group: CompletionGroup): this.type =
def link(group: CompletionGroup): this.type = synchronized:
this.group.drop(this)
this.group = group
this.group.add(this)
Expand Down
Loading