From b5918c3774236d75f1fbb48c2d3fa6f6ca3d8217 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Fri, 11 Aug 2023 10:28:25 +0900 Subject: [PATCH] [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute ### What changes were proposed in this pull request? Simplify retries of ReleaseExecute in ExecutePlanResponseReattachableIterator. Instead of chaining asynchronous calls, use the common retry loop logic from the asynchronous onError after first error. This allows to reuse the common `retry` function instead of having to duplicate ``` case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries => Thread.sleep( (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis) ``` logic. This also brings this retries to be more similar to how it is in the python client. ### Why are the changes needed? Code simplification. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Checked retries by printing `new Exception().printStackTrace` from the handler: ``` java.lang.Exception at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242) at org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.Exception at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242) at org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ... ``` Closes #42438 from juliuszsompolski/SPARK-44765. Authored-by: Juliusz Sompolski Signed-off-by: Hyukjin Kwon --- ...cutePlanResponseReattachableIterator.scala | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 7a50801d8a6e5..5ef1151682b1d 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -180,7 +180,7 @@ class ExecutePlanResponseReattachableIterator( private def releaseUntil(untilResponseId: String): Unit = { if (!resultComplete) { val request = createReleaseExecuteRequest(Some(untilResponseId)) - rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserver(request)) } } @@ -194,7 +194,7 @@ class ExecutePlanResponseReattachableIterator( private def releaseAll(): Unit = { if (!resultComplete) { val request = createReleaseExecuteRequest(None) - rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserver(request)) resultComplete = true } } @@ -229,22 +229,28 @@ class ExecutePlanResponseReattachableIterator( * ReleaseExecute and continues with iteration, but if it fails with a retryable error, the * callback will retrigger the asynchronous ReleaseExecute. */ - private def createRetryingReleaseExecuteResponseObserer( - requestForRetry: proto.ReleaseExecuteRequest, - currentRetryNum: Int = 0): StreamObserver[proto.ReleaseExecuteResponse] = { + private def createRetryingReleaseExecuteResponseObserver( + requestForRetry: proto.ReleaseExecuteRequest) + : StreamObserver[proto.ReleaseExecuteResponse] = { new StreamObserver[proto.ReleaseExecuteResponse] { override def onNext(v: proto.ReleaseExecuteResponse): Unit = {} override def onCompleted(): Unit = {} - override def onError(t: Throwable): Unit = t match { - case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries => - Thread.sleep( - (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math - .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis) - rawAsyncStub.releaseExecute( - requestForRetry, - createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1)) - case _ => - logWarning(s"ReleaseExecute failed with exception: $t.") + override def onError(t: Throwable): Unit = { + var firstTry = true + try { + retry { + if (firstTry) { + firstTry = false + throw t // we already failed once, handle first retry + } else { + // we already are in async execution thread, can execute further retries sync + rawBlockingStub.releaseExecute(requestForRetry) + } + } + } catch { + case NonFatal(e) => + logWarning(s"ReleaseExecute failed with exception: $e.") + } } } }