From 87b2ed7cfca43ae507a7bbaf3f49d68cc031d7e3 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Sat, 22 Feb 2025 09:23:52 +0900 Subject: [PATCH] [SPARK-51279][CONNECT] Avoid constant sleep for waiting Spark Connect server in Scala ### What changes were proposed in this pull request? This PR proposes to address https://github.com/apache/spark/pull/50017#discussion_r1963027036 by avoiding constant sleep but waiting until the log file is created by the local Spark Connect server. ### Why are the changes needed? To make it robust. ### Does this PR introduce _any_ user-facing change? Maybe they won't see retrying logs anymore. ### How was this patch tested? Manually tested via `./bin/spark-shell --remote local` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50039 from HyukjinKwon/avoid-sleep. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 666f45d9e5ef1e2999465c752182a4bd7b8d7151) Signed-off-by: Hyukjin Kwon --- .../spark/sql/connect/SparkSession.scala | 59 +++++++++++++++++-- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala index a64f29f6583b7..c4067ea3ac330 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala @@ -16,12 +16,16 @@ */ package org.apache.spark.sql.connect +import java.io.File import java.net.URI -import java.nio.file.{Files, Paths} +import java.nio.file.{Files, FileSystems, Path, Paths, StandardWatchEventKinds} import java.util.Locale +import java.util.UUID import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe.TypeTag import scala.util.Try @@ -37,7 +41,7 @@ import org.apache.spark.connect.proto import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.connect.proto.ExecutePlanResponse.ObservedMetrics import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.CONFIG +import org.apache.spark.internal.LogKeys.{CONFIG, PATH} import org.apache.spark.rdd.RDD import org.apache.spark.sql import org.apache.spark.sql.{Column, Encoder, ExperimentalMethods, Observation, Row, SparkSessionBuilder, SparkSessionCompanion, SparkSessionExtensions} @@ -706,6 +710,33 @@ object SparkSession extends SparkSessionCompanion with Logging { override def load(c: Configuration): SparkSession = create(c) }) + private def waitUntilFileExists(file: File): Unit = { + val deadline = 30.seconds.fromNow + val watchService = FileSystems.getDefault.newWatchService() + try { + file.toPath.getParent.register(watchService, StandardWatchEventKinds.ENTRY_CREATE) + while (!file.exists() && deadline.hasTimeLeft()) { + Option(watchService.poll(deadline.timeLeft.toSeconds + 1, TimeUnit.SECONDS)) match { + case Some(key) => + key.pollEvents().forEach { event => + val kind = event.kind() + val filename = event.context().asInstanceOf[Path] + + if (kind == StandardWatchEventKinds.ENTRY_CREATE + && filename.toString == file.toPath.getFileName.toString) { + key.cancel() + return + } + } + key.reset() + case None => + } + } + } finally { + watchService.close() + } + } + /** * Create a new Spark Connect server to connect locally. */ @@ -731,6 +762,7 @@ object SparkSession extends SparkSessionCompanion with Logging { (remoteString.exists(_.startsWith("local")) || (remoteString.isDefined && isAPIModeConnect)) && maybeConnectStartScript.exists(Files.exists(_))) { + val serverId = UUID.randomUUID().toString server = Some { val args = Seq( @@ -745,12 +777,29 @@ object SparkSession extends SparkSessionCompanion with Logging { val pb = new ProcessBuilder(args: _*) // So don't exclude spark-sql jar in classpath pb.environment().remove(SparkConnectClient.SPARK_REMOTE) + pb.environment().put("SPARK_IDENT_STRING", serverId) + pb.environment().put("HOSTNAME", "local") pb.start() } - // Let the server start. We will directly request to set the configurations - // and this sleep makes less noisy with retries. - Thread.sleep(2000L) + // Let the server start, and wait until the log file is created. + Option(System.getenv("SPARK_LOG_DIR")) + .orElse(Option(System.getenv("SPARK_HOME")).map(p => Paths.get(p, "logs").toString)) + .foreach { p => + val logFile = Paths + .get( + p, + s"spark-$serverId-" + + s"org.apache.spark.sql.connect.service.SparkConnectServer-1-local.out") + .toFile + waitUntilFileExists(logFile) + if (logFile.exists()) { + logInfo(log"Spark Connect server started with the log file: ${MDC(PATH, logFile)}") + } else { + logWarning(log"Spark Connect server log not found at ${MDC(PATH, logFile)}") + } + } + System.setProperty("spark.remote", "sc://localhost") // scalastyle:off runtimeaddshutdownhook