Skip to content

Commit

Permalink
Java CDK: Fix CompletableFutures.allOf to handle empty list (#41680)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jul 15, 2024
1 parent 1aac2ba commit a859a7d
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 41 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.41.3 | 2024-07-15 | [\#41680](https://github.com/airbytehq/airbyte/pull/41680) | Fix: CompletableFutures.allOf now handles empty list and `Throwable` |
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |
| 0.41.1 | 2024-07-11 | [\#41212](https://github.com/airbytehq/airbyte/pull/41212) | Improve debezium logging. |
| 0.41.0 | 2024-07-11 | [\#38240](https://github.com/airbytehq/airbyte/pull/38240) | Sources : Changes in CDC interfaces to support WASS algorithm |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.41.2
version=0.41.3
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,53 @@
package io.airbyte.commons.concurrency

import io.airbyte.commons.functional.Either
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.CompletionStage
import java.util.concurrent.atomic.AtomicInteger
import kotlinx.coroutines.flow.*
import java.util.concurrent.ExecutionException

object CompletableFutures {
/**
* Non-blocking implementation which does not use join. and returns an aggregated future. The
* order of results is preserved from the original list of futures.
*
* @param futures list of futures
* @param <Result> type of result
* @return a future that completes when all the input futures have completed </Result>
* @param Result type of result
* @return a future that completes when all the input futures have completed
*/
fun <Result> allOf(
futures: List<CompletionStage<Result>>
): CompletionStage<List<Either<out Exception, Result>>> {
val result = CompletableFuture<List<Either<out Exception, Result>>>()
val size = futures.size
val counter = AtomicInteger()
// This whole function should probably use kotlin flows, but I couldn't figure it out...
@Suppress("unchecked_cast")
val results =
java.lang.reflect.Array.newInstance(Either::class.java, size)
as Array<Either<Exception, Result>>
// attach a whenComplete to all futures

for (i in 0 until size) {
val currentIndex = i
futures[i].whenComplete { value: Result, exception: Throwable? ->
// if exception is null, then the future completed successfully
// maybe synchronization is unnecessary here, but it's better to be safe
synchronized(results) {
if (exception == null) {
results[currentIndex] = Either.right(value)
} else {
if (exception is Exception) {
results[currentIndex] = Either.left(exception)
} else {
// this should never happen
throw RuntimeException(
"Unexpected exception in a future completion.",
exception
)
}
val futuresArray: Array<CompletableFuture<Result>> =
futures.map { it.toCompletableFuture() }.toTypedArray()
return CompletableFuture.allOf(*futuresArray)
// We're going to get the individual exceptions from the futures,
// so we ignore the Throwable parameter here.
// stdlib's allOf() gives us a Void, so we ignore the value as well,
// and manually fetch the individual future values.
.handle { _: Void?, _: Throwable? ->
futures.map {
try {
// By the time we get here, the futures have already
// completed, so we can just get() them
Either.right(it.toCompletableFuture().get())
} catch (e: ExecutionException) {
// For historical reasons, we return the wrapped
// exception instead of just returning the underlying
// cause.
// For _other_ historical reasons, we rewrap this into
// a CompletionException.
// In practice, most callers will just check the value
// of `result.left.cause`, which doesn't care about
// the actual exception type.
Either.left(CompletionException(e.cause))
} catch (e: Exception) {
Either.left(e)
}
}
val completedCount = counter.incrementAndGet()
if (completedCount == size) {
result.complete(Arrays.asList(*results))
// handle() will take care of other Throwable types,
// so don't explicitly handle them.
// We want to crash loudly on e.g. OutOfMemoryError.
}
}
}
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package io.airbyte.commons.concurrency

import io.airbyte.commons.functional.Either
import java.util.*
import java.time.Duration
import java.util.Arrays
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.fail
import org.junit.jupiter.api.Test

internal class CompletableFuturesTest {
Expand Down Expand Up @@ -45,6 +47,41 @@ internal class CompletableFuturesTest {
Assertions.assertEquals(failureMessages, mutableListOf("Fail 5", "Fail 6"))
}

@Test
fun allOfEmptyList() {
Assertions.assertTimeoutPreemptively(Duration.ofSeconds(5)) {
Assertions.assertEquals(
emptyList<String>(),
CompletableFutures.allOf(emptyList<CompletableFuture<String>>())
.toCompletableFuture()
.join(),
)
}
}

@Test
fun testFutureThrowingThrowable() {
val errorMessage = "Throwable1"
try {
val result =
CompletableFutures.allOf(
listOf(CompletableFuture.failedFuture<Int>(Throwable(errorMessage)))
)
.toCompletableFuture()
.get()
val failureMessages =
result
.filter { obj: Either<out Exception, Int> -> obj.isLeft() }
.map { either: Either<out Exception, Int> -> either.left!!.cause!!.message }
Assertions.assertEquals(
listOf(errorMessage),
failureMessages,
)
} catch (t: Throwable) {
fail(t)
}
}

private fun returnSuccessWithDelay(value: Int, delayMs: Long): CompletableFuture<Int> {
return CompletableFuture.supplyAsync {
try {
Expand Down

0 comments on commit a859a7d

Please sign in to comment.