diff --git a/jvm/src/main/scala/PosixLikeIO/PIO.scala b/jvm/src/main/scala/PosixLikeIO/PIO.scala index bda242da..83bb5fec 100644 --- a/jvm/src/main/scala/PosixLikeIO/PIO.scala +++ b/jvm/src/main/scala/PosixLikeIO/PIO.scala @@ -13,7 +13,16 @@ import scala.Tuple.Union import scala.concurrent.ExecutionContext import scala.util.{Failure, Success, Try} +object File: + extension (resolver: Future.Resolver[Int]) + private[File] def toCompletionHandler = new CompletionHandler[Integer, ByteBuffer] { + override def completed(result: Integer, attachment: ByteBuffer): Unit = resolver.resolve(result) + override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e) + } + class File(val path: String) { + import File._ + private var channel: Option[AsynchronousFileChannel] = None def isOpened: Boolean = channel.isDefined && channel.get.isOpen @@ -32,50 +41,41 @@ class File(val path: String) { def read(buffer: ByteBuffer): Future[Int] = assert(channel.isDefined) - val p = Promise[Int]() - channel.get.read( - buffer, - 0, - buffer, - new CompletionHandler[Integer, ByteBuffer] { - override def completed(result: Integer, attachment: ByteBuffer): Unit = p.complete(Success(result)) - override def failed(e: Throwable, attachment: ByteBuffer): Unit = p.complete(Failure(e)) - } - ) - p.future + Future.withResolver[Int]: resolver => + channel.get.read( + buffer, + 0, + buffer, + resolver.toCompletionHandler + ) def readString(size: Int, charset: Charset = StandardCharsets.UTF_8): Future[String] = assert(channel.isDefined) assert(size >= 0) val buffer = ByteBuffer.allocate(size) - val p = Promise[String]() - channel.get.read( - buffer, - 0, - buffer, - new CompletionHandler[Integer, ByteBuffer] { - override def completed(result: Integer, attachment: ByteBuffer): Unit = - p.complete(Success(charset.decode(attachment.slice(0, result)).toString())) - override def failed(e: Throwable, attachment: ByteBuffer): Unit = p.complete(Failure(e)) - } - ) - p.future + Future.withResolver[String]: resolver => + channel.get.read( + buffer, + 0, + buffer, + new CompletionHandler[Integer, ByteBuffer] { + override def completed(result: Integer, attachment: ByteBuffer): Unit = + resolver.resolve(charset.decode(attachment.slice(0, result)).toString()) + override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e) + } + ) def write(buffer: ByteBuffer): Future[Int] = assert(channel.isDefined) - val p = Promise[Int]() - channel.get.write( - buffer, - 0, - buffer, - new CompletionHandler[Integer, ByteBuffer] { - override def completed(result: Integer, attachment: ByteBuffer): Unit = p.complete(Success(result)) - override def failed(e: Throwable, attachment: ByteBuffer): Unit = p.complete(Failure(e)) - } - ) - p.future + Future.withResolver[Int]: resolver => + channel.get.write( + buffer, + 0, + buffer, + resolver.toCompletionHandler + ) def writeString(s: String, charset: Charset = StandardCharsets.UTF_8): Future[Int] = write(ByteBuffer.wrap(s.getBytes(charset))) diff --git a/native/src/main/scala/async/ForkJoinSupport.scala b/native/src/main/scala/async/ForkJoinSupport.scala index 75cda15f..85d076f1 100644 --- a/native/src/main/scala/async/ForkJoinSupport.scala +++ b/native/src/main/scala/async/ForkJoinSupport.scala @@ -86,7 +86,9 @@ class SuspendExecutorWithSleep(exec: ExecutionContext) Future .withResolver[Unit]: resolver => val cancellable = schedule(millis.millis, () => resolver.resolve(())) - resolver.onCancel(cancellable.cancel) + resolver.onCancel: () => + cancellable.cancel() + resolver.rejectAsCancelled() .link() .await } diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index 3ed56767..c2f3cece 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -23,7 +23,7 @@ class CompletionGroup extends Cancellable.Tracking: private[async] def waitCompletion()(using Async): Unit = synchronized: if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise()) - cancelWait.foreach(cWait => cWait.future.await) + cancelWait.foreach(cWait => cWait.await) unlink() /** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */ diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 6b60614c..e2b98f74 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -21,10 +21,11 @@ trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable object Future: - /** A future that is completed explicitly by calling its `complete` method. There are two public implementations + /** A future that is completed explicitly by calling its `complete` method. There are three public implementations * * - RunnableFuture: Completion is done by running a block of code - * - Promise.future: Completion is done by external request. + * - Promise.apply: Completion is done by external request. + * - withResolver: Completion is done by external request set up from a block of code. */ private class CoreFuture[+T] extends Future[T]: @@ -156,47 +157,6 @@ object Future: def apply[T](body: Async ?=> T)(using Async): Future[T] = RunnableFuture(body) - /** The group of handlers to be used in [[withResolver]]. As a Future is completed only once, only one of - * resolve/reject/complete may be used and only once. - */ - trait Resolver[-T]: - /** Complete the future with a data item successfully */ - def resolve(item: T): Unit = complete(Success(item)) - - /** Complete the future with a failure */ - def reject(exc: Throwable): Unit = complete(Failure(exc)) - - /** Complete the future with the result, be it Success or Failure */ - def complete(result: Try[T]): Unit - - /** Register a cancellation handler to be called when the created future is cancelled. Note that only one handler - * may be used. - */ - def onCancel(handler: () => Unit): Unit - end Resolver - - /** Create a future that may be completed asynchronously using external means. - * - * The body is run synchronously on the callers thread to setup an external asynchronous operation whose - * success/failure it communicates using the [[Resolver]] to complete the future. - * - * If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel]]. - */ - def withResolver[T](body: Resolver[T] => Unit): Future[T] = - val future = new CoreFuture[T] with Resolver[T] { - @volatile var cancelHandle = () => () - override def onCancel(handler: () => Unit): Unit = cancelHandle = handler - override def complete(result: Try[T]): Unit = super.complete(result) - - override def cancel(): Unit = - if setCancelled() then - cancelHandle() - reject(CancellationException()) - } - body(future) - future - end withResolver - /** A future that immediately terminates with the given result */ def now[T](result: Try[T]): Future[T] = val f = CoreFuture[T]() @@ -261,23 +221,69 @@ object Future: end extension - /** A promise defines a future that is be completed via the promise's `complete` method. + /** A promise defines a future that is be completed via the `complete` method. */ - class Promise[T]: - private val myFuture = new CoreFuture[T]: - fut => - override def cancel(): Unit = - if setCancelled() then fut.complete(Failure(new CancellationException())) + trait Promise[T] extends Future[T]: + inline def asFuture: Future[T] = this - /** The future defined by this promise */ - val future: Future[T] = myFuture + /** Define the result value of `future`. */ + def complete(result: Try[T]): Unit - /** Define the result value of `future`. However, if `future` was cancelled in the meantime complete with a - * `CancellationException` failure instead. - */ - def complete(result: Try[T]): Unit = myFuture.complete(result) + object Promise: + def apply[T](): Promise[T] = + new CoreFuture[T] with Promise[T]: + override def cancel(): Unit = + if setCancelled() then complete(Failure(new CancellationException())) + + /** Define the result value of `future`. However, if `future` was cancelled in the meantime complete with a + * `CancellationException` failure instead. + */ + override def complete(result: Try[T]): Unit = super[CoreFuture].complete(result) end Promise + /** The group of handlers to be used in [[withResolver]]. As a Future is completed only once, only one of + * resolve/reject/complete may be used and only once. + */ + trait Resolver[-T]: + /** Complete the future with a data item successfully */ + def resolve(item: T): Unit = complete(Success(item)) + + /** Complete the future with a failure */ + def reject(exc: Throwable): Unit = complete(Failure(exc)) + + /** Complete the future with a [[CancellationException]] */ + def rejectAsCancelled(): Unit = complete(Failure(new CancellationException())) + + /** Complete the future with the result, be it Success or Failure */ + def complete(result: Try[T]): Unit + + /** Register a cancellation handler to be called when the created future is cancelled. Note that only one handler + * may be used. The handler should eventually complete the Future using one of complete/resolve/reject*. The + * default handler is set up to [[rejectAsCancelled]] immediately. + */ + def onCancel(handler: () => Unit): Unit + end Resolver + + /** Create a promise that may be completed asynchronously using external means. + * + * The body is run synchronously on the callers thread to setup an external asynchronous operation whose + * success/failure it communicates using the [[Resolver]] to complete the future. + * + * If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel]]. + */ + def withResolver[T](body: Resolver[T] => Unit): Future[T] = + val future = new CoreFuture[T] with Resolver[T] with Promise[T] { + @volatile var cancelHandle = () => rejectAsCancelled() + override def onCancel(handler: () => Unit): Unit = cancelHandle = handler + override def complete(result: Try[T]): Unit = super.complete(result) + + override def cancel(): Unit = + if setCancelled() then cancelHandle() + } + body(future) + future + end withResolver + /** Collects a list of futures into a channel of futures, arriving as they finish. */ class Collector[T](futures: Future[T]*): private val ch = UnboundedChannel[Future[T]]() diff --git a/shared/src/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala index f882a8f4..4df7e435 100644 --- a/shared/src/test/scala/CancellationBehavior.scala +++ b/shared/src/test/scala/CancellationBehavior.scala @@ -77,7 +77,7 @@ class CancellationBehavior extends munit.FunSuite: val promise = Future.Promise[Unit]() Async.group: startFuture(info, promise.complete(Success(()))) - promise.future.await + promise.await info.assertCancelled() test("nested link group"): @@ -89,13 +89,13 @@ class CancellationBehavior extends munit.FunSuite: info1, { Async.group: startFuture(info2, promise2.complete(Success(()))) - promise2.future.await + promise2.await info2.assertCancelled() Future.now(Success(())).await // check cancellation promise1.complete(Success(())) } ) - promise1.future.await + promise1.await info1.assertCancelled() info2.assertCancelled() @@ -123,6 +123,6 @@ class CancellationBehavior extends munit.FunSuite: Async.group: Async.current.group.cancel() // cancel now val f = startFuture(info, promise.complete(Success(()))) - promise.future.awaitResult + promise.awaitResult f.awaitResult info.assertCancelled() diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index 0b0813d2..d41438af 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -10,6 +10,7 @@ import scala.util.{Failure, Success, Try} import scala.util.Random import scala.collection.mutable.Set import java.util.concurrent.atomic.AtomicInteger +import gears.async.Listener class FutureBehavior extends munit.FunSuite { given ExecutionContext = ExecutionContext.global @@ -302,7 +303,7 @@ class FutureBehavior extends munit.FunSuite { test("Promise can be cancelled") { Async.blocking: val p = Promise[Int]() - val f = p.future + val f = p.asFuture f.cancel() p.complete(Success(10)) f.awaitResult match @@ -314,7 +315,7 @@ class FutureBehavior extends munit.FunSuite { Async.blocking: val p = Promise[Int]() p.complete(Success(10)) - val f = p.future + val f = p.asFuture f.cancel() assertEquals(f.await, 10) } @@ -338,6 +339,18 @@ class FutureBehavior extends munit.FunSuite { assertEquals(num.get(), 0) } + test("Future.withResolver is only completed after handler decides") { + val prom = Future.Promise[Unit]() + val fut = Future.withResolver[Unit]: r => + r.onCancel(() => prom.onComplete(Listener { (_, _) => r.rejectAsCancelled() })) + + assert(fut.poll().isEmpty) + fut.cancel() + assert(fut.poll().isEmpty) + prom.complete(Success(())) + assert(fut.poll().isDefined) + } + test("Nesting of cancellations") { Async.blocking: var touched1 = false diff --git a/shared/src/test/scala/ListenerBehavior.scala b/shared/src/test/scala/ListenerBehavior.scala index 2cf14618..24a6c141 100644 --- a/shared/src/test/scala/ListenerBehavior.scala +++ b/shared/src/test/scala/ListenerBehavior.scala @@ -24,7 +24,7 @@ class ListenerBehavior extends munit.FunSuite: val prom1 = Promise[Unit]() val prom2 = Promise[Unit]() Async.blocking: - val raced = race(Future { prom1.future.await; 10 }, Future { prom2.future.await; 20 }) + val raced = race(Future { prom1.await; 10 }, Future { prom2.await; 20 }) assert(!raced.poll(Listener.acceptingListener((x, _) => fail(s"race uncomplete $x")))) prom1.complete(Success(())) assertEquals(raced.await, 10) @@ -330,7 +330,7 @@ private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean, if sleep.getAndSet(false) then Async.blocking: waiter = Some(Promise()) - waiter.get.future.await + waiter.get.await waiter.foreach: promise => promise.complete(Success(())) waiter = None diff --git a/shared/src/test/scala/SourceBehavior.scala b/shared/src/test/scala/SourceBehavior.scala index 58f60a4f..78749556 100644 --- a/shared/src/test/scala/SourceBehavior.scala +++ b/shared/src/test/scala/SourceBehavior.scala @@ -146,11 +146,11 @@ class SourceBehavior extends munit.FunSuite { val g = f.transformValuesWith(identity) f.onComplete(Listener.acceptingListener { (_, _) => aRan.complete(Success(())) }) g.onComplete(Listener.acceptingListener { (_, _) => bRan.complete(Success(())) }) - assertEquals(aRan.future.poll(), None) - assertEquals(bRan.future.poll(), None) + assertEquals(aRan.poll(), None) + assertEquals(bRan.poll(), None) f.await Thread.sleep(100) // onComplete of await and manual may be scheduled - aRan.future.zip(bRan.future).alt(Future(sleep(600))).await + aRan.zip(bRan).alt(Future(sleep(600))).await } test("either") {