Skip to content

Commit

Permalink
[SPARK-50735][CONNECT] Failure in ExecuteResponseObserver results in …
Browse files Browse the repository at this point in the history
…infinite reattaching requests

### What changes were proposed in this pull request?

The Spark Connect reattach request handler checks whether the associated ExecuteThreadRunner is completed, and returns an error if it has failed to record the outcome.

### Why are the changes needed?

ExecuteResponseObserver.{onError, onComplete} are fallible while they are not retried; this leads to a situation where the ExecuteThreadRunner is completed without succeeding in responding to the client, and thus the client keeps retrying by reattaching the execution.

To be specific, if an ExecuteThreadRunner fails to record the completion of execution or an error on the observer and then just disappears, the client will endlessly reattach, hoping that "someone" will eventually record "some data," but since the ExecuteThreadRunner is gone, this becomes a deadlock situation.

The fix is that when the client attaches, the handler checks the status of the ExecuteThreadRunner, and if it finds that the execution cannot make any progress, an error is returned.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

testOnly org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49370 from changgyoopark-db/SPARK-50735.

Lead-authored-by: changgyoopark-db <changgyoo.park@databricks.com>
Co-authored-by: Changgyoo Park <164183463+changgyoopark-db@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
2 people authored and HyukjinKwon committed Jan 20, 2025
1 parent efeb1e0 commit 07aa4ff
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
finalProducedIndex.isDefined
}

// For testing.
private[connect] def undoCompletion(): Unit = responseLock.synchronized {
finalProducedIndex = None
}

/**
* Remove cached responses after response with lastReturnedIndex is returned from getResponse.
* Remove according to caching policy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
}
}

/**
* Checks if the thread is alive.
*
* @return
* true if the execution thread is currently running.
*/
private[connect] def isAlive(): Boolean = {
executionThread.isAlive()
}

/**
* Interrupts the execution thread if the execution has been interrupted by this method call.
*
Expand Down Expand Up @@ -288,7 +298,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends
* True if we should delegate sending the final ResultComplete to the handler thread, i.e.
* don't send a ResultComplete when the ExecuteThread returns.
*/
private def shouldDelegateCompleteResponse(request: proto.ExecutePlanRequest): Boolean = {
private[connect] def shouldDelegateCompleteResponse(
request: proto.ExecutePlanRequest): Boolean = {
request.getPlan.getOpTypeCase == proto.Plan.OpTypeCase.COMMAND &&
request.getPlan.getCommand.getCommandTypeCase ==
proto.Command.CommandTypeCase.STREAMING_QUERY_LISTENER_BUS_COMMAND &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ private[connect] class ExecuteHolder(
runner.start()
}

/**
* Check if the execution was ended without finalizing the outcome and no further progress will
* be made. If the execution was delegated, this method always returns false.
*/
def isOrphan(): Boolean = {
!runner.isAlive() &&
!runner.shouldDelegateCompleteResponse(request) &&
!responseObserver.completed()
}

def addObservation(name: String, observation: Observation): Unit = synchronized {
observations += (name -> observation)
}
Expand Down Expand Up @@ -195,6 +205,16 @@ private[connect] class ExecuteHolder(
grpcResponseSenders.foreach(_.interrupt())
}

// For testing
private[connect] def undoResponseObserverCompletion() = synchronized {
responseObserver.undoCompletion()
}

// For testing
private[connect] def isExecuteThreadRunnerAlive() = {
runner.isAlive()
}

/**
* For a short period in ExecutePlan after creation and until runGrpcResponseSender is called,
* there is no attached response sender, but yet we start with lastAttachedRpcTime = None, so we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class SparkConnectReattachExecuteHandler(
throw new SparkSQLException(
errorClass = "INVALID_CURSOR.NOT_REATTACHABLE",
messageParameters = Map.empty)
} else if (executeHolder.isOrphan()) {
logWarning("Reattach to an orphan operation.")
SparkConnectService.executionManager.removeExecuteHolder(executeHolder.key)
throw new IllegalStateException("Operation was orphaned because of an internal error.")
}

val responseSender =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkException
import org.apache.spark.sql.connect.SparkConnectServerTest
import org.apache.spark.sql.connect.config.Connect

class SparkConnectServiceE2ESuite extends SparkConnectServerTest {

Expand Down Expand Up @@ -267,4 +268,25 @@ class SparkConnectServiceE2ESuite extends SparkConnectServerTest {
iter2.hasNext
}
}

test("Exceptions thrown in the gRPC response observer does not lead to infinite retries") {
withSparkEnvConfs(
(Connect.CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION.key, "10")) {
withClient { client =>
val query = client.execute(buildPlan("SELECT 1"))
query.hasNext
val execution = eventuallyGetExecutionHolder
Eventually.eventually(timeout(eventuallyTimeout)) {
assert(!execution.isExecuteThreadRunnerAlive())
}

execution.undoResponseObserverCompletion()

val error = intercept[SparkException] {
while (query.hasNext) query.next()
}
assert(error.getMessage.contains("IllegalStateException"))
}
}
}
}

0 comments on commit 07aa4ff

Please sign in to comment.