diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 09fc88de493da..1a3936195f9bf 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -269,7 +269,7 @@ object MimaExcludes { ProblemFilters.exclude[Problem]("org.apache.spark.sql.protobuf.utils.SchemaConverters.*"), // SPARK-51267: Match local Spark Connect server logic between Python and Scala - ProblemFilters.exclude[MissingFieldProblem]("org.apache.spark.launcher.SparkLauncher.SPARK_LOCAL_REMOTE") + ProblemFilters.exclude[MissingFieldProblem]("org.apache.spark.launcher.SparkLauncher.SPARK_LOCAL_REMOTE"), (problem: Problem) => problem match { case MissingClassProblem(cls) => !cls.fullName.startsWith("org.sparkproject.jpmml") && diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala index bee90c77f348d..a64f29f6583b7 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/SparkSession.scala @@ -696,9 +696,7 @@ object SparkSession extends SparkSessionCompanion with Logging { Option(System.getenv("SPARK_HOME")).map(Paths.get(_, "sbin", "stop-connect-server.sh")) private[sql] val sparkOptions = sys.props.filter { p => p._1.startsWith("spark.") && p._2.nonEmpty - }.toMap ++ Map( - "spark.sql.artifact.isolation.enabled" -> "true", - "spark.sql.artifact.isolation.alwaysApplyClassloader" -> "true") + }.toMap private val sessions = CacheBuilder .newBuilder() @@ -712,60 +710,62 @@ object SparkSession extends SparkSessionCompanion with Logging { * Create a new Spark Connect server to connect locally. */ private[sql] def withLocalConnectServer[T](f: => T): T = { - synchronized { - lazy val isAPIModeConnect = - Option(System.getProperty(org.apache.spark.sql.SparkSessionBuilder.API_MODE_KEY)) - .getOrElse("classic") - .toLowerCase(Locale.ROOT) == "connect" - val remoteString = sparkOptions - .get("spark.remote") - .orElse(Option(System.getProperty("spark.remote"))) // Set from Spark Submit - .orElse(sys.env.get(SparkConnectClient.SPARK_REMOTE)) - .orElse { - if (isAPIModeConnect) { - sparkOptions.get("spark.master").orElse(sys.env.get("MASTER")) - } else { - None - } + lazy val isAPIModeConnect = + Option(System.getProperty(org.apache.spark.sql.SparkSessionBuilder.API_MODE_KEY)) + .getOrElse("classic") + .toLowerCase(Locale.ROOT) == "connect" + val remoteString = sparkOptions + .get("spark.remote") + .orElse(Option(System.getProperty("spark.remote"))) // Set from Spark Submit + .orElse(sys.env.get(SparkConnectClient.SPARK_REMOTE)) + .orElse { + if (isAPIModeConnect) { + sparkOptions.get("spark.master").orElse(sys.env.get("MASTER")) + } else { + None } + } - server.synchronized { - if (server.isEmpty && - (remoteString.exists(_.startsWith("local")) || - (remoteString.isDefined && isAPIModeConnect)) && - maybeConnectStartScript.exists(Files.exists(_))) { - server = Some { - val args = - Seq( - maybeConnectStartScript.get.toString, - "--master", - remoteString.get) ++ sparkOptions - .filter(p => !p._1.startsWith("spark.remote")) - .filter(p => !p._1.startsWith("spark.api.mode")) - .flatMap { case (k, v) => Seq("--conf", s"$k=$v") } - val pb = new ProcessBuilder(args: _*) - // So don't exclude spark-sql jar in classpath - pb.environment().remove(SparkConnectClient.SPARK_REMOTE) - pb.environment().put("SPARK_LOCAL_CONNECT", "1") - pb.start() - } + server.synchronized { + if (server.isEmpty && + (remoteString.exists(_.startsWith("local")) || + (remoteString.isDefined && isAPIModeConnect)) && + maybeConnectStartScript.exists(Files.exists(_))) { + server = Some { + val args = + Seq( + maybeConnectStartScript.get.toString, + "--master", + remoteString.get) ++ (sparkOptions ++ Map( + "spark.sql.artifact.isolation.enabled" -> "true", + "spark.sql.artifact.isolation.alwaysApplyClassloader" -> "true")) + .filter(p => !p._1.startsWith("spark.remote")) + .filter(p => !p._1.startsWith("spark.api.mode")) + .flatMap { case (k, v) => Seq("--conf", s"$k=$v") } + val pb = new ProcessBuilder(args: _*) + // So don't exclude spark-sql jar in classpath + pb.environment().remove(SparkConnectClient.SPARK_REMOTE) + pb.start() + } - // Let the server start. We will directly request to set the configurations - // and this sleep makes less noisy with retries. - Thread.sleep(2000L) - System.setProperty("spark.remote", "sc://localhost") + // Let the server start. We will directly request to set the configurations + // and this sleep makes less noisy with retries. + Thread.sleep(2000L) + System.setProperty("spark.remote", "sc://localhost") - // scalastyle:off runtimeaddshutdownhook - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = if (server.isDefined) { + // scalastyle:off runtimeaddshutdownhook + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run(): Unit = server.synchronized { + if (server.isDefined) { new ProcessBuilder(maybeConnectStopScript.get.toString) .start() } - }) - // scalastyle:on runtimeaddshutdownhook - } + } + }) + // scalastyle:on runtimeaddshutdownhook } } + f }