Skip to content

Commit

Permalink
[SPARK-50605][CONNECT] Support SQL API mode for easier migration to S…
Browse files Browse the repository at this point in the history
…park Connect

### What changes were proposed in this pull request?

This PR proposes to add a new configuration called `spark.api.mode` (default `classic`) that allows existing Spark Classic applications to easily use Spark Connect.

### Why are the changes needed?

In order users to easily try Spark Connect with their existing Spark Classic applications.

### Does this PR introduce _any_ user-facing change?

It adds a new configuration `spark.api.mode` without changing the default behaviour.

### How was this patch tested?

For PySpark applications, added unittests for Spark Submissions in Yarn and Kubernates, and manual tests.
For Scala applications, it is difficult to add a test because SBT picks up the complied jars into the classpathes automatically (whereas we can easily control it for production environment).

For this case, I manually tested as below:

```bash
git clone https://github.com/HyukjinKwon/spark-connect-example
cd spark-connect-example
build/sbt package
cd ..
git clone https://github.com/apache/spark.git
cd spark
build/sbt package
# sbin/start-connect-server.sh
bin/spark-submit --name "testApp" --master "local[*]" --conf spark.api.mode=connect --class com.hyukjinkwon.SparkConnectExample ../spark-connect-example/target/scala-2.13/spark-connect-example_2.13-0.0.1.jar
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#49107 from HyukjinKwon/api-mode-draft-3.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Feb 6, 2025
1 parent 93e9f6e commit 96bd8f1
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 19 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ jobs:
python-version: '3.11'
architecture: x64
- name: Install Python packages (Python 3.11)
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect')
if: (contains(matrix.modules, 'sql') && !contains(matrix.modules, 'sql-')) || contains(matrix.modules, 'connect') || contains(matrix.modules, 'yarn')
run: |
python3.11 -m pip install 'numpy>=1.20.0' pyarrow pandas scipy unittest-xml-reporting 'lxml==4.9.4' 'grpcio==1.67.0' 'grpcio-status==1.67.0' 'protobuf==5.29.1'
python3.11 -m pip list
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy
import java.io.File
import java.net.URI
import java.nio.file.Files
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -52,9 +53,12 @@ object PythonRunner {
// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
val apiMode = sparkConf.get(SPARK_API_MODE).toLowerCase(Locale.ROOT)
val isAPIModeClassic = apiMode == "classic"
val isAPIModeConnect = apiMode == "connect"

var gatewayServer: Option[Py4JServer] = None
if (sparkConf.getOption("spark.remote").isEmpty) {
if (sparkConf.getOption("spark.remote").isEmpty || isAPIModeClassic) {
gatewayServer = Some(new Py4JServer(sparkConf))

val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.get.start() })
Expand All @@ -80,7 +84,7 @@ object PythonRunner {
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
if (sparkConf.getOption("spark.remote").nonEmpty) {
if (sparkConf.getOption("spark.remote").nonEmpty || isAPIModeConnect) {
// For non-local remote, pass configurations to environment variables so
// Spark Connect client sets them. For local remotes, they will be set
// via Py4J.
Expand All @@ -90,7 +94,11 @@ object PythonRunner {
env.put(s"PYSPARK_REMOTE_INIT_CONF_$idx", compact(render(group)))
}
}
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
if (isAPIModeClassic) {
sparkConf.getOption("spark.master").foreach(url => env.put("MASTER", url))
} else {
sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url))
}
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2821,4 +2821,15 @@ package object config {
.version("4.0.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val SPARK_API_MODE =
ConfigBuilder("spark.api.mode")
.doc("For Spark Classic applications, specify whether to automatically use Spark Connect " +
"by running a local Spark Connect server dedicated to the application. The server is " +
"terminated when the application is terminated. The value can be `classic` or `connect`.")
.version("4.0.0")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("connect", "classic"))
.createWithDefault("classic")
}
27 changes: 27 additions & 0 deletions docs/app-dev-spark-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ to the client via the blue box as part of the Spark Connect API. The client uses
alongside PySpark or the Spark Scala client, making it easy for Spark client applications to work
with the custom logic/library.

## Spark API Mode: Spark Client and Spark Classic

Spark provides the API mode, `spark.api.mode` configuration, enabling Spark Classic applications
to seamlessly switch to Spark Connect. Depending on the value of `spark.api.mode`, the application
can run in either Spark Classic or Spark Connect mode. Here is an example:

{% highlight python %}
from pyspark.sql import SparkSession

SparkSession.builder.config("spark.api.mode", "connect").master("...").getOrCreate()
{% endhighlight %}

You can also apply this configuration to both Scala and PySpark applications when submitting yours:

{% highlight bash %}
spark-submit --master "..." --conf spark.api.mode=connect
{% endhighlight %}

Additionally, Spark Connect offers convenient options for local testing. By setting `spark.remote`
to `local[...]` or `local-cluster[...]`, you can start a local Spark Connect server and access a Spark
Connect session.

This is similar to using `--conf spark.api.mode=connect` with `--master ...`. However, note that
`spark.remote` and `--remote` are limited to `local*` values, while `--conf spark.api.mode=connect`
with `--master ...` supports additional cluster URLs, such as spark://, for broader compatibility with
Spark Classic.

## Spark Client Applications

Spark Client Applications are the _regular Spark applications_ that Spark users develop today, e.g.,
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3390,6 +3390,14 @@ They are typically set via the config file and command-line options with `--conf

<table class="spark-config">
<thead><tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr></thead>
<tr>
<td><code>spark.api.mode</code></td>
<td>
classic
</td>
<td>For Spark Classic applications, specify whether to automatically use Spark Connect by running a local Spark Connect server. The value can be <code>classic</code> or <code>connect</code>.</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.connect.grpc.binding.port</code></td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
public static final String SPARK_REMOTE = "spark.remote";
public static final String SPARK_LOCAL_REMOTE = "spark.local.connect";

/** The Spark API mode. */
public static final String SPARK_API_MODE = "spark.api.mode";

/** The Spark deploy mode. */
public static final String DEPLOY_MODE = "spark.submit.deployMode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO
if (remoteStr != null) {
env.put("SPARK_REMOTE", remoteStr);
env.put("SPARK_CONNECT_MODE_ENABLED", "1");
} else if (conf.getOrDefault(
SparkLauncher.SPARK_API_MODE, "classic").toLowerCase(Locale.ROOT).equals("connect") &&
masterStr != null) {
env.put("SPARK_REMOTE", masterStr);
env.put("SPARK_CONNECT_MODE_ENABLED", "1");
}

if (!isEmpty(pyOpts)) {
Expand Down Expand Up @@ -523,7 +528,9 @@ protected boolean handle(String opt, String value) {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
if (setConf[0].equals("spark.remote")) {
if (setConf[0].equals("spark.remote") ||
(setConf[0].equals(SparkLauncher.SPARK_API_MODE) &&
setConf[1].toLowerCase(Locale.ROOT).equals("connect"))) {
isRemote = true;
}
conf.put(setConf[0], setConf[1]);
Expand Down
16 changes: 8 additions & 8 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,23 +473,20 @@ def getOrCreate(self) -> "SparkSession":

url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))

if url is None:
raise PySparkRuntimeError(
errorClass="CONNECT_URL_NOT_SET",
messageParameters={},
)

os.environ["SPARK_CONNECT_MODE_ENABLED"] = "1"
opts["spark.remote"] = url
return RemoteSparkSession.builder.config(map=opts).getOrCreate() # type: ignore

from pyspark.core.context import SparkContext

with self._lock:
is_api_mode_connect = opts.get("spark.api.mode", "classic").lower() == "connect"

if (
"SPARK_CONNECT_MODE_ENABLED" in os.environ
or "SPARK_REMOTE" in os.environ
or "spark.remote" in opts
or is_api_mode_connect
):
with SparkContext._lock:
from pyspark.sql.connect.session import SparkSession as RemoteSparkSession
Expand All @@ -498,15 +495,18 @@ def getOrCreate(self) -> "SparkSession":
SparkContext._active_spark_context is None
and SparkSession._instantiatedSession is None
):
url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))
if is_api_mode_connect:
url = opts.get("spark.master", os.environ.get("MASTER"))
else:
url = opts.get("spark.remote", os.environ.get("SPARK_REMOTE"))

if url is None:
raise PySparkRuntimeError(
errorClass="CONNECT_URL_NOT_SET",
messageParameters={},
)

if url.startswith("local"):
if url.startswith("local") or is_api_mode_connect:
os.environ["SPARK_LOCAL_REMOTE"] = "1"
RemoteSparkSession._start_connect_server(url, opts)
url = "sc://localhost"
Expand Down
9 changes: 9 additions & 0 deletions python/pyspark/sql/tests/connect/test_connect_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,15 @@ def test_stop_invalid_session(self): # SPARK-47986
# Should not throw any error
session.stop()

def test_api_mode(self):
session = (
PySparkSession.builder.config("spark.api.mode", "connect")
.master("sc://localhost")
.getOrCreate()
)
self.assertEqual(session.range(1).first()[0], 0)
self.assertIsInstance(session, RemoteSparkSession)


@unittest.skipIf(not should_test_connect, connect_requirement_message)
class SparkConnectSessionWithOptionsTest(unittest.TestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import org.apache.spark.internal.config.SPARK_API_MODE

private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>

import PythonTestsSuite._
Expand Down Expand Up @@ -67,13 +69,29 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}

// Needs to install Spark Connect dependencies in Python Dockerfile, ignored for now.
ignore("Run PySpark with Spark Connect", k8sTestTag) {
sparkAppConf.set("spark.kubernetes.container.image", pyImage)
sparkAppConf.set(SPARK_API_MODE.key, "connect")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_CONNECT_FILES,
mainClass = "",
expectedDriverLogOnCompletion = Seq("Python runtime version check for executor is: True"),
appArgs = Array(sparkAppConf.get("spark.master")),
driverPodChecker = doBasicDriverPyPodCheck,
executorPodChecker = doBasicExecutorPyPodCheck,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}
}

private[spark] object PythonTestsSuite {
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONNECT_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles_connect.py"
val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py"
val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType


if __name__ == "__main__":
"""
Usage: pyfiles
"""
spark = SparkSession \
.builder \
.appName("PyFilesTest") \
.config("spark.api.mode", "connect") \
.master(sys.argv[1]) \
.getOrCreate()

assert "connect" in str(spark)

# Check python executable at executors
spark.udf.register("get_sys_ver",
lambda: "%d.%d" % sys.version_info[:2], StringType())
[row] = spark.sql("SELECT get_sys_ver()").collect()
driver_version = "%d.%d" % sys.version_info[:2]
print("Python runtime version check for executor is: " + str(row[0] == driver_version))

spark.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ abstract class BaseYarnClusterSuite extends SparkFunSuite with Matchers {
.setPropertiesFile(propsFile)
.addAppArgs(appArgs.toArray: _*)

extraConf.get(SPARK_API_MODE.key).foreach { v =>
launcher.setConf(SPARK_API_MODE.key, v)
}

sparkArgs.foreach { case (name, value) =>
if (value != null) {
launcher.addSparkArg(name, value)
Expand Down
Loading

0 comments on commit 96bd8f1

Please sign in to comment.