Skip to content

Commit

Permalink
[SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Simplify retries of ReleaseExecute in ExecutePlanResponseReattachableIterator.
Instead of chaining asynchronous calls, use the common retry loop logic from the asynchronous onError after first error.
This allows to reuse the common `retry` function instead of having to duplicate
```
        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
          Thread.sleep(
            (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
              .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
```
logic.

This also brings this retries to be more similar to how it is in the python client.

### Why are the changes needed?

Code simplification.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Checked retries by printing `new Exception().printStackTrace` from the handler:
```
java.lang.Exception
        at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242)
        at org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169)
        at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306)
        at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241)
        at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
java.lang.Exception
        at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242)
        at org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169)
        at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306)
        at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241)
        at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
...
```

Closes apache#42438 from juliuszsompolski/SPARK-44765.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
juliuszsompolski authored and hvanhovell committed Aug 13, 2023
1 parent 06f09eb commit b5918c3
Showing 1 changed file with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class ExecutePlanResponseReattachableIterator(
private def releaseUntil(untilResponseId: String): Unit = {
if (!resultComplete) {
val request = createReleaseExecuteRequest(Some(untilResponseId))
rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request))
rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserver(request))
}
}

Expand All @@ -194,7 +194,7 @@ class ExecutePlanResponseReattachableIterator(
private def releaseAll(): Unit = {
if (!resultComplete) {
val request = createReleaseExecuteRequest(None)
rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request))
rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserver(request))
resultComplete = true
}
}
Expand Down Expand Up @@ -229,22 +229,28 @@ class ExecutePlanResponseReattachableIterator(
* ReleaseExecute and continues with iteration, but if it fails with a retryable error, the
* callback will retrigger the asynchronous ReleaseExecute.
*/
private def createRetryingReleaseExecuteResponseObserer(
requestForRetry: proto.ReleaseExecuteRequest,
currentRetryNum: Int = 0): StreamObserver[proto.ReleaseExecuteResponse] = {
private def createRetryingReleaseExecuteResponseObserver(
requestForRetry: proto.ReleaseExecuteRequest)
: StreamObserver[proto.ReleaseExecuteResponse] = {
new StreamObserver[proto.ReleaseExecuteResponse] {
override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
override def onCompleted(): Unit = {}
override def onError(t: Throwable): Unit = t match {
case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries =>
Thread.sleep(
(retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
.pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
rawAsyncStub.releaseExecute(
requestForRetry,
createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1))
case _ =>
logWarning(s"ReleaseExecute failed with exception: $t.")
override def onError(t: Throwable): Unit = {
var firstTry = true
try {
retry {
if (firstTry) {
firstTry = false
throw t // we already failed once, handle first retry
} else {
// we already are in async execution thread, can execute further retries sync
rawBlockingStub.releaseExecute(requestForRetry)
}
}
} catch {
case NonFatal(e) =>
logWarning(s"ReleaseExecute failed with exception: $e.")
}
}
}
}
Expand Down

0 comments on commit b5918c3

Please sign in to comment.