Skip to content

Commit

Permalink
Resource parZip & fromAutoCloseable (#2452)
Browse files Browse the repository at this point in the history
* fix docs

* Add Resourc parZip

* Uncomment test, and remove deprecated usage

* Fix JS CancellationException ambiguity

* Replace shouldThrow with assertThrowable for JS runtime.
  • Loading branch information
nomisRev committed Aug 2, 2021
1 parent ddbac3e commit 88365eb
Show file tree
Hide file tree
Showing 4 changed files with 426 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ package arrow.fx.coroutines
import arrow.core.Either
import arrow.core.andThen
import arrow.core.identity
import arrow.core.nonFatalOrThrow
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.withContext
import kotlin.coroutines.CoroutineContext

/**
* [Resource] models resource allocation and releasing. It is especially useful when multiple resources that depend on each other
Expand Down Expand Up @@ -110,8 +119,16 @@ public sealed class Resource<out A> {
public suspend infix fun <B> use(f: suspend (A) -> B): B =
useLoop(this as Resource<Any?>, f as suspend (Any?) -> Any?, emptyList()) as B

@Deprecated("Here for binary compat reasons", level = DeprecationLevel.HIDDEN)
public fun <B> map(f: (A) -> B): Resource<B> =
flatMap { a -> just(f(a)) }
flatMap { a -> Resource({ f(a) }) { _, _ -> } }

public fun <B> map(f: suspend (A) -> B): Resource<B> =
flatMap { a -> Resource({ f(a) }) { _, _ -> } }

/** Useful for setting up/configuring an acquired resource */
public fun <B> tap(f: suspend (A) -> Unit): Resource<A> =
map { f(it); it }

public fun <B> ap(ff: Resource<(A) -> B>): Resource<B> =
flatMap { res -> ff.map { it(res) } }
Expand Down Expand Up @@ -149,7 +166,7 @@ public sealed class Resource<out A> {
Bind(this, f)

public fun <B, C> zip(other: Resource<B>, combine: (A, B) -> C): Resource<C> =
flatMap { r ->
flatMap { r ->
other.map { r2 -> combine(r, r2) }
}

Expand Down Expand Up @@ -274,6 +291,48 @@ public sealed class Resource<out A> {
}
}

public fun <B, C> parZip(fb: Resource<B>, f: (A, B) -> C): Resource<C> =
parZip(Dispatchers.Default, fb, f)

/**
* Composes two [Resource]s together by zipping them in parallel,
* by running both their `acquire` handlers in parallel, and both `release` handlers in parallel.
*/
public fun <B, C> parZip(
ctx: CoroutineContext = Dispatchers.Default,
fb: Resource<B>,
f: (A, B) -> C
): Resource<C> =
Resource({
supervisorScope {
val faa = async(ctx) { allocate() }
val fbb = async(ctx) { fb.allocate() }
val a = awaitOrCancelOther(faa, fbb)
val b = awaitOrCancelOther(fbb, faa)
Pair(a, b)
}
}, { (ar, br), ex ->
val (_, releaseA) = ar
val (_, releaseB) = br
supervisorScope {
val faa = async(ctx) { releaseA(ex) }
val fbb = async(ctx) { releaseB(ex) }
try {
faa.await()
} catch (errorA: Throwable) {
try {
fbb.await()
} catch (errorB: Throwable) {
throw Platform.composeErrors(errorA, errorB)
}
throw errorA
}
fbb.await()
}
}).map { (ar, br) ->
f(ar.first, br.first)
}

public class Bind<A, B>(public val source: Resource<A>, public val f: (A) -> Resource<B>) : Resource<B>()

public class Allocate<A>(
Expand Down Expand Up @@ -317,6 +376,7 @@ public sealed class Resource<out A> {
*
* @see [use] For a version that provides an [ExitCase] to [release]
*/
@Deprecated("Conflicts with other invoke constructor", ReplaceWith("Resource(acquire) { a, _ -> release(a) }"))
public operator fun <A> invoke(
acquire: suspend () -> A,
release: suspend (A) -> Unit
Expand Down Expand Up @@ -516,3 +576,78 @@ public inline fun <A, B> Iterable<A>.traverseResource(crossinline f: (A) -> Reso
@Suppress("NOTHING_TO_INLINE")
public inline fun <A> Iterable<Resource<A>>.sequence(): Resource<List<A>> =
traverseResource(::identity)

// Interpreter that knows how to evaluate a Resource data structure
// Maintains its own stack for dealing with Bind chains
@Suppress("UNCHECKED_CAST")
private tailrec suspend fun useLoop(
current: Resource<Any?>,
stack: List<(Any?) -> Resource<Any?>>
): Pair<Any?, suspend (ExitCase) -> Unit> =
when (current) {
is Resource.Defer -> useLoop(current.resource.invoke(), stack)
is Resource.Bind<*, *> ->
useLoop(current.source, listOf(current.f as (Any?) -> Resource<Any?>) + stack)
is Resource.Allocate -> loadResourceAndReleaseHandler(
acquire = current.acquire,
use = { a ->
when {
stack.isEmpty() -> Pair(a) { ex -> current.release(a, ex) }
else -> useLoop(stack.first()(a), stack.drop(1))
}
},
release = { _, _ -> /*a, exitCase -> current.release(a, exitCase)*/ }
)
}

private suspend fun <A> Resource<A>.allocate(): Pair<A, suspend (ExitCase) -> Unit> =
useLoop(this, emptyList()) as Pair<A, suspend (ExitCase) -> Unit>

private suspend inline fun loadResourceAndReleaseHandler(
crossinline acquire: suspend () -> Any?,
crossinline use: suspend (Any?) -> Pair<Any?, suspend (ExitCase) -> Unit>,
crossinline release: suspend (Any?, ExitCase) -> Unit
): Pair<Any?, suspend (ExitCase) -> Unit> {
val acquired = withContext(NonCancellable) {
acquire()
}

return try { // Successfully loaded resource, pass it and its release f down
val (b, _release) = use(acquired)
Pair(b) { ex -> _release(ex); release(acquired, ex) }
} catch (e: CancellationException) { // Release when cancelled
runReleaseAndRethrow(e) { release(acquired, ExitCase.Cancelled(e)) }
} catch (t: Throwable) { // Release when failed to load resource
runReleaseAndRethrow(t.nonFatalOrThrow()) { release(acquired, ExitCase.Failure(t.nonFatalOrThrow())) }
}
}

private suspend fun <A, B> awaitOrCancelOther(
fa: Deferred<Pair<A, suspend (ExitCase) -> Unit>>,
fb: Deferred<Pair<B, suspend (ExitCase) -> Unit>>
): Pair<A, suspend (ExitCase) -> Unit> =
try {
fa.await()
} catch (e: Throwable) {
if (e is CancellationException) awaitAndAddSuppressed(fb, e, ExitCase.Cancelled(e))
else awaitAndAddSuppressed(fb, e, ExitCase.Failure(e))
}

private suspend fun awaitAndAddSuppressed(
fb: Deferred<Pair<*, suspend (ExitCase) -> Unit>>,
e: Throwable,
exitCase: ExitCase
): Nothing {
val cancellationException = try {
if (fb.isCancelled && fb.isCompleted) fb.getCompletionExceptionOrNull()
else fb.await().second.invoke(exitCase).let { null }
} catch (e2: Throwable) {
throw e.apply { addSuppressed(e2) }
}

val exception = cancellationException?.let {
e.apply { addSuppressed(it) }
} ?: e

throw exception
}
Loading

0 comments on commit 88365eb

Please sign in to comment.