diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt index f003cfa86a3..62e0ea9ffc2 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonMain/kotlin/arrow/fx/coroutines/Resource.kt @@ -3,6 +3,15 @@ package arrow.fx.coroutines import arrow.core.Either import arrow.core.andThen import arrow.core.identity +import arrow.core.nonFatalOrThrow +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.async +import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.withContext +import kotlin.coroutines.CoroutineContext /** * [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other @@ -110,8 +119,16 @@ public sealed class Resource { public suspend infix fun use(f: suspend (A) -> B): B = useLoop(this as Resource, f as suspend (Any?) -> Any?, emptyList()) as B + @Deprecated("Here for binary compat reasons", level = DeprecationLevel.HIDDEN) public fun map(f: (A) -> B): Resource = - flatMap { a -> just(f(a)) } + flatMap { a -> Resource({ f(a) }) { _, _ -> } } + + public fun map(f: suspend (A) -> B): Resource = + flatMap { a -> Resource({ f(a) }) { _, _ -> } } + + /** Useful for setting up/configuring an acquired resource */ + public fun tap(f: suspend (A) -> Unit): Resource = + map { f(it); it } public fun ap(ff: Resource<(A) -> B>): Resource = flatMap { res -> ff.map { it(res) } } @@ -149,7 +166,7 @@ public sealed class Resource { Bind(this, f) public fun zip(other: Resource, combine: (A, B) -> C): Resource = - flatMap { r -> + flatMap { r -> other.map { r2 -> combine(r, r2) } } @@ -274,6 +291,48 @@ public sealed class Resource { } } + public fun parZip(fb: Resource, f: (A, B) -> C): Resource = + parZip(Dispatchers.Default, fb, f) + + /** + * Composes two [Resource]s together by zipping them in parallel, + * by running both their `acquire` handlers in parallel, and both `release` handlers in parallel. + */ + public fun parZip( + ctx: CoroutineContext = Dispatchers.Default, + fb: Resource, + f: (A, B) -> C + ): Resource = + Resource({ + supervisorScope { + val faa = async(ctx) { allocate() } + val fbb = async(ctx) { fb.allocate() } + val a = awaitOrCancelOther(faa, fbb) + val b = awaitOrCancelOther(fbb, faa) + Pair(a, b) + } + }, { (ar, br), ex -> + val (_, releaseA) = ar + val (_, releaseB) = br + supervisorScope { + val faa = async(ctx) { releaseA(ex) } + val fbb = async(ctx) { releaseB(ex) } + try { + faa.await() + } catch (errorA: Throwable) { + try { + fbb.await() + } catch (errorB: Throwable) { + throw Platform.composeErrors(errorA, errorB) + } + throw errorA + } + fbb.await() + } + }).map { (ar, br) -> + f(ar.first, br.first) + } + public class Bind(public val source: Resource, public val f: (A) -> Resource) : Resource() public class Allocate( @@ -317,6 +376,7 @@ public sealed class Resource { * * @see [use] For a version that provides an [ExitCase] to [release] */ + @Deprecated("Conflicts with other invoke constructor", ReplaceWith("Resource(acquire) { a, _ -> release(a) }")) public operator fun invoke( acquire: suspend () -> A, release: suspend (A) -> Unit @@ -516,3 +576,78 @@ public inline fun Iterable.traverseResource(crossinline f: (A) -> Reso @Suppress("NOTHING_TO_INLINE") public inline fun Iterable>.sequence(): Resource> = traverseResource(::identity) + +// Interpreter that knows how to evaluate a Resource data structure +// Maintains its own stack for dealing with Bind chains +@Suppress("UNCHECKED_CAST") +private tailrec suspend fun useLoop( + current: Resource, + stack: List<(Any?) -> Resource> +): Pair Unit> = + when (current) { + is Resource.Defer -> useLoop(current.resource.invoke(), stack) + is Resource.Bind<*, *> -> + useLoop(current.source, listOf(current.f as (Any?) -> Resource) + stack) + is Resource.Allocate -> loadResourceAndReleaseHandler( + acquire = current.acquire, + use = { a -> + when { + stack.isEmpty() -> Pair(a) { ex -> current.release(a, ex) } + else -> useLoop(stack.first()(a), stack.drop(1)) + } + }, + release = { _, _ -> /*a, exitCase -> current.release(a, exitCase)*/ } + ) + } + +private suspend fun Resource.allocate(): Pair Unit> = + useLoop(this, emptyList()) as Pair Unit> + +private suspend inline fun loadResourceAndReleaseHandler( + crossinline acquire: suspend () -> Any?, + crossinline use: suspend (Any?) -> Pair Unit>, + crossinline release: suspend (Any?, ExitCase) -> Unit +): Pair Unit> { + val acquired = withContext(NonCancellable) { + acquire() + } + + return try { // Successfully loaded resource, pass it and its release f down + val (b, _release) = use(acquired) + Pair(b) { ex -> _release(ex); release(acquired, ex) } + } catch (e: CancellationException) { // Release when cancelled + runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) } + } catch (t: Throwable) { // Release when failed to load resource + runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) } + } +} + +private suspend fun awaitOrCancelOther( + fa: Deferred Unit>>, + fb: Deferred Unit>> +): Pair Unit> = + try { + fa.await() + } catch (e: Throwable) { + if (e is CancellationException) awaitAndAddSuppressed(fb, e, ExitCase.Cancelled(e)) + else awaitAndAddSuppressed(fb, e, ExitCase.Failure(e)) + } + +private suspend fun awaitAndAddSuppressed( + fb: Deferred Unit>>, + e: Throwable, + exitCase: ExitCase +): Nothing { + val cancellationException = try { + if (fb.isCancelled && fb.isCompleted) fb.getCompletionExceptionOrNull() + else fb.await().second.invoke(exitCase).let { null } + } catch (e2: Throwable) { + throw e.apply { addSuppressed(e2) } + } + + val exception = cancellationException?.let { + e.apply { addSuppressed(it) } + } ?: e + + throw exception +} diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt index da9d68e099f..3d42ae4314d 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/commonTest/kotlin/arrow/fx/coroutines/ResourceTest.kt @@ -1,14 +1,17 @@ package arrow.fx.coroutines import arrow.core.Either +import io.kotest.assertions.fail import io.kotest.matchers.should import io.kotest.matchers.shouldBe import io.kotest.matchers.types.shouldBeInstanceOf +import io.kotest.matchers.types.shouldBeTypeOf import io.kotest.property.Arb import io.kotest.property.arbitrary.int import io.kotest.property.arbitrary.list import io.kotest.property.arbitrary.map import io.kotest.property.arbitrary.string +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async @@ -17,7 +20,7 @@ class ResourceTest : ArrowFxSpec( "Can consume resource" { checkAll(Arb.int()) { n -> - val r = Resource({ n }, { _ -> Unit }) + val r = Resource({ n }, { _, _ -> Unit }) r.use { it + 1 } shouldBe n + 1 } @@ -105,6 +108,195 @@ class ResourceTest : ArrowFxSpec( mutable.toList() shouldBe list } } + + "parZip - Right CancellationException on acquire" { + checkAll(Arb.int()) { i -> + val cancel = CancellationException(null, null) + val released = CompletableDeferred>() + + assertThrowable { + Resource({ i }, { ii, ex -> + released.complete(ii to ex) + }).parZip(Resource({ throw cancel }) { _, _ -> }) { _, _ -> } + .use { fail("It should never reach here") } + }.shouldBeTypeOf() + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Left CancellationException on acquire" { + checkAll(Arb.int()) { i -> + val cancel = CancellationException(null, null) + val released = CompletableDeferred>() + + assertThrowable { + Resource({ throw cancel }) { _, _ -> } + .parZip(Resource({ i }, { ii, ex -> + released.complete(ii to ex) + })) { _, _ -> } + .use { fail("It should never reach here") } + }.shouldBeTypeOf() + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Right error on acquire" { + checkAll(Arb.int(), Arb.throwable()) { i, throwable -> + val released = CompletableDeferred>() + + assertThrowable { + Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + .parZip( + Resource({ throw throwable }) { _, _ -> } + ) { _, _ -> } + .use { fail("It should never reach here") } + } shouldBe throwable + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Left error on acquire" { + checkAll(Arb.int(), Arb.throwable()) { i, throwable -> + val released = CompletableDeferred>() + + assertThrowable { + Resource({ throw throwable }) { _, _ -> } + .parZip( + Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + ) { _, _ -> } + .use { fail("It should never reach here") } + } shouldBe throwable + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Right CancellationException on release" { + checkAll(Arb.int()) { i -> + val cancel = CancellationException(null, null) + val released = CompletableDeferred>() + + assertThrowable { + Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + .parZip( + Resource({ }) { _, _ -> throw cancel } + ) { _, _ -> } + .use { } + }.shouldBeTypeOf() + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Left CancellationException on release" { + checkAll(Arb.int()) { i -> + val cancel = CancellationException(null, null) + val released = CompletableDeferred>() + + assertThrowable { + Resource({ }) { _, _ -> throw cancel } + .parZip( + Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + ) { _, _ -> } + .use { /*fail("It should never reach here")*/ } + }.shouldBeTypeOf() + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Right error on release" { + checkAll(Arb.int(), Arb.throwable()) { i, throwable -> + val released = CompletableDeferred>() + + assertThrowable { + Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + .parZip( + Resource({ }) { _, _ -> throw throwable } + ) { _, _ -> } + .use { } + } shouldBe throwable + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - Left error on release" { + checkAll(Arb.int(), Arb.throwable()) { i, throwable -> + val released = CompletableDeferred>() + + assertThrowable { + Resource({ }) { _, _ -> throw throwable } + .parZip( + Resource({ i }, { ii, ex -> released.complete(ii to ex) }) + ) { _, _ -> } + .use { } + } shouldBe throwable + + val (ii, ex) = released.await() + ii shouldBe i + ex.shouldBeTypeOf() + } + } + + "parZip - error in use" { + checkAll(Arb.int(), Arb.int(), Arb.throwable()) { a, b, throwable -> + val releasedA = CompletableDeferred>() + val releasedB = CompletableDeferred>() + + assertThrowable { + Resource({ a }) { aa, ex -> releasedA.complete(aa to ex) } + .parZip( + Resource({ b }) { bb, ex -> releasedB.complete(bb to ex) } + ) { _, _ -> } + .use { throw throwable } + } shouldBe throwable + + val (aa, exA) = releasedA.await() + aa shouldBe a + exA.shouldBeTypeOf() + + val (bb, exB) = releasedB.await() + bb shouldBe b + exB.shouldBeTypeOf() + } + } + + "parZip - runs in parallel" { + checkAll(Arb.int(), Arb.int()) { a, b -> + val r = Atomic("") + val modifyGate = CompletableDeferred() + + Resource({ + modifyGate.await() + r.update { i -> "$i$a" } + }) { _, _ -> } + .parZip(Resource({ + r.set("$b") + modifyGate.complete(0) + }) { _, _ -> }) { _a, _b -> _a to _b } + .use { + r.get() shouldBe "$b$a" + } + } + } } ) diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmMain/kotlin/arrow/fx/coroutines/ResourceExtensions.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmMain/kotlin/arrow/fx/coroutines/ResourceExtensions.kt index d80001ee2b2..9279d9ef708 100644 --- a/arrow-libs/fx/arrow-fx-coroutines/src/jvmMain/kotlin/arrow/fx/coroutines/ResourceExtensions.kt +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmMain/kotlin/arrow/fx/coroutines/ResourceExtensions.kt @@ -38,7 +38,7 @@ import kotlin.coroutines.CoroutineContext * ``` */ public fun Resource.Companion.fromExecutor(f: suspend () -> ExecutorService): Resource = - Resource(f) { s -> s.shutdown() }.map(ExecutorService::asCoroutineDispatcher) + Resource(f) { s, _ -> s.shutdown() }.map(ExecutorService::asCoroutineDispatcher) /** * Creates a [Resource] from an [Closeable], which uses [Closeable.close] for releasing. @@ -48,16 +48,39 @@ public fun Resource.Companion.fromExecutor(f: suspend () -> ExecutorService): Re * import java.io.FileInputStream * * suspend fun copyFile(src: String, dest: String): Unit = - * Resource.fromClosable { FileInputStream(src) } - * .zip(Resource.fromClosable { FileInputStream(dest) }) + * Resource.fromCloseable { FileInputStream(src) } + * .zip(Resource.fromCloseable { FileInputStream(dest) }) * .use { (a: FileInputStream, b: FileInputStream) -> * /** read from [a] and write to [b]. **/ * // Both resources will be closed accordingly to their #close methods * } * ``` */ +public fun Resource.Companion.fromCloseable(f: suspend () -> A): Resource = + Resource(f) { s, _ -> withContext(Dispatchers.IO) { s.close() } } + +@Deprecated("Typo in the function name, use fromCloseable instead.", ReplaceWith("Resource.fromCloseable(f)")) public fun Resource.Companion.fromClosable(f: suspend () -> A): Resource = - Resource(f) { s -> withContext(Dispatchers.IO) { s.close() } } + fromCloseable(f) + +/** + * Creates a [Resource] from an [AutoCloseable], which uses [AutoCloseable.close] for releasing. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * import java.io.FileInputStream + * + * suspend fun copyFile(src: String, dest: String): Unit = + * Resource.fromAutoCloseable { FileInputStream(src) } + * .zip(Resource.fromAutoCloseable { FileInputStream(dest) }) + * .use { (a: FileInputStream, b: FileInputStream) -> + * /** read from [a] and write to [b]. **/ + * // Both resources will be closed accordingly to their #close methods + * } + * ``` + */ +public fun Resource.Companion.fromAutoCloseable(f: suspend () -> A): Resource = + Resource(f) { s, _ -> withContext(Dispatchers.IO) { s.close() } } /** * Creates a single threaded [CoroutineContext] as a [Resource]. diff --git a/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/ResourceTestJvm.kt b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/ResourceTestJvm.kt new file mode 100644 index 00000000000..f2049b4062e --- /dev/null +++ b/arrow-libs/fx/arrow-fx-coroutines/src/jvmTest/kotlin/arrow/fx/coroutines/ResourceTestJvm.kt @@ -0,0 +1,69 @@ +package arrow.fx.coroutines + +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.shouldBe +import java.util.concurrent.atomic.AtomicBoolean +import java.lang.AutoCloseable +import java.io.Closeable +import io.kotest.property.Arb + +class ResourceTestJvm : ArrowFxSpec(spec = { + + class AutoCloseableTest() : AutoCloseable { + val didClose = AtomicBoolean(false) + override fun close() = didClose.set(true) + } + + class CloseableTest() : Closeable { + val didClose = AtomicBoolean(false) + override fun close() = didClose.set(true) + } + + "AutoCloseable closes" { + checkAll { + val t = AutoCloseableTest() + + Resource.fromAutoCloseable { t } + .use {} + + t.didClose.get() shouldBe true + } + } + + "AutoCloseable closes on error" { + checkAll(Arb.throwable()) { throwable -> + val t = AutoCloseableTest() + + shouldThrow { + Resource.fromAutoCloseable { t } + .use { throw throwable } + } shouldBe throwable + + t.didClose.get() shouldBe true + } + } + + "Closeable closes" { + checkAll() { + val t = CloseableTest() + + Resource.fromCloseable { t } + .use {} + + t.didClose.get() shouldBe true + } + } + + "Closeable closes on error" { + checkAll(Arb.throwable()) { throwable -> + val t = CloseableTest() + + shouldThrow { + Resource.fromCloseable { t } + .use { throw throwable } + } shouldBe throwable + + t.didClose.get() shouldBe true + } + } +})