Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Feb 21, 2025
1 parent 227c613 commit 00955b7
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 49 deletions.
2 changes: 1 addition & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ object MimaExcludes {
ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.utils.SchemaConverters.*"),

// SPARK-51267: Match local Spark Connect server logic between Python and Scala
ProblemFilters.exclude[MissingFieldProblem]("org.apache.spark.launcher.SparkLauncher.SPARK_LOCAL_REMOTE")
ProblemFilters.exclude[MissingFieldProblem]("org.apache.spark.launcher.SparkLauncher.SPARK_LOCAL_REMOTE"),

(problem: Problem) => problem match {
case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,9 +696,7 @@ object SparkSession extends SparkSessionCompanion with Logging {
Option(System.getenv("SPARK_HOME")).map(Paths.get(_, "sbin", "stop-connect-server.sh"))
private[sql] val sparkOptions = sys.props.filter { p =>
p._1.startsWith("spark.") && p._2.nonEmpty
}.toMap ++ Map(
"spark.sql.artifact.isolation.enabled" -> "true",
"spark.sql.artifact.isolation.alwaysApplyClassloader" -> "true")
}.toMap

private val sessions = CacheBuilder
.newBuilder()
Expand All @@ -712,60 +710,62 @@ object SparkSession extends SparkSessionCompanion with Logging {
* Create a new Spark Connect server to connect locally.
*/
private[sql] def withLocalConnectServer[T](f: => T): T = {
synchronized {
lazy val isAPIModeConnect =
Option(System.getProperty(org.apache.spark.sql.SparkSessionBuilder.API_MODE_KEY))
.getOrElse("classic")
.toLowerCase(Locale.ROOT) == "connect"
val remoteString = sparkOptions
.get("spark.remote")
.orElse(Option(System.getProperty("spark.remote"))) // Set from Spark Submit
.orElse(sys.env.get(SparkConnectClient.SPARK_REMOTE))
.orElse {
if (isAPIModeConnect) {
sparkOptions.get("spark.master").orElse(sys.env.get("MASTER"))
} else {
None
}
lazy val isAPIModeConnect =
Option(System.getProperty(org.apache.spark.sql.SparkSessionBuilder.API_MODE_KEY))
.getOrElse("classic")
.toLowerCase(Locale.ROOT) == "connect"
val remoteString = sparkOptions
.get("spark.remote")
.orElse(Option(System.getProperty("spark.remote"))) // Set from Spark Submit
.orElse(sys.env.get(SparkConnectClient.SPARK_REMOTE))
.orElse {
if (isAPIModeConnect) {
sparkOptions.get("spark.master").orElse(sys.env.get("MASTER"))
} else {
None
}
}

server.synchronized {
if (server.isEmpty &&
(remoteString.exists(_.startsWith("local")) ||
(remoteString.isDefined && isAPIModeConnect)) &&
maybeConnectStartScript.exists(Files.exists(_))) {
server = Some {
val args =
Seq(
maybeConnectStartScript.get.toString,
"--master",
remoteString.get) ++ sparkOptions
.filter(p => !p._1.startsWith("spark.remote"))
.filter(p => !p._1.startsWith("spark.api.mode"))
.flatMap { case (k, v) => Seq("--conf", s"$k=$v") }
val pb = new ProcessBuilder(args: _*)
// So don't exclude spark-sql jar in classpath
pb.environment().remove(SparkConnectClient.SPARK_REMOTE)
pb.environment().put("SPARK_LOCAL_CONNECT", "1")
pb.start()
}
server.synchronized {
if (server.isEmpty &&
(remoteString.exists(_.startsWith("local")) ||
(remoteString.isDefined && isAPIModeConnect)) &&
maybeConnectStartScript.exists(Files.exists(_))) {
server = Some {
val args =
Seq(
maybeConnectStartScript.get.toString,
"--master",
remoteString.get) ++ (sparkOptions ++ Map(
"spark.sql.artifact.isolation.enabled" -> "true",
"spark.sql.artifact.isolation.alwaysApplyClassloader" -> "true"))
.filter(p => !p._1.startsWith("spark.remote"))
.filter(p => !p._1.startsWith("spark.api.mode"))
.flatMap { case (k, v) => Seq("--conf", s"$k=$v") }
val pb = new ProcessBuilder(args: _*)
// So don't exclude spark-sql jar in classpath
pb.environment().remove(SparkConnectClient.SPARK_REMOTE)
pb.start()
}

// Let the server start. We will directly request to set the configurations
// and this sleep makes less noisy with retries.
Thread.sleep(2000L)
System.setProperty("spark.remote", "sc://localhost")
// Let the server start. We will directly request to set the configurations
// and this sleep makes less noisy with retries.
Thread.sleep(2000L)
System.setProperty("spark.remote", "sc://localhost")

// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = if (server.isDefined) {
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = server.synchronized {
if (server.isDefined) {
new ProcessBuilder(maybeConnectStopScript.get.toString)
.start()
}
})
// scalastyle:on runtimeaddshutdownhook
}
}
})
// scalastyle:on runtimeaddshutdownhook
}
}

f
}

Expand Down

0 comments on commit 00955b7

Please sign in to comment.