Skip to content

Commit

Permalink
[SPARK-51146][SQL][FOLLOWUP] Respect system env SPARK_CONNECT_MODE
Browse files Browse the repository at this point in the history
…in places that access the api mode config

### What changes were proposed in this pull request?

This is a followup of the additional Spark Connect distribution work. Some places that access the api mode config do not use the config framework and do not respect the defined default value. We need to update them manually to get the default value properly by looking at the special system env `SPARK_CONNECT_MODE` set in scripts of the spark connect distribution.

### Why are the changes needed?

to make the additional Spark Connect distribution work as expected.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

manually tested locally. The expectation is:
1. if `--remote` is specified, then we pick connect mode no matter what the api mode config is
2. if `--master` is specified, then we pick classic or connect mode depending on the api mode config. If the config is not specified, the default value varies between the default distribution and the Spark connect distribution.
3. if neither `--remote` nor `--master` is specified, it's equal to `--master local[*]`, and then it's the same as rule 2.

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #49930 from cloud-fan/release.

Lead-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan and cloud-fan committed Feb 15, 2025
1 parent 09b93bd commit f601eb7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
OptionParser parser = new OptionParser(true);
parser.parse(submitArgs);
this.isSpecialCommand = parser.isSpecialCommand;
boolean connectByDefault = "1".equals(System.getenv("SPARK_CONNECT_MODE"));
String defaultApiMode = connectByDefault ? "connect" : "classic";
String apiMode = conf.getOrDefault(SparkLauncher.SPARK_API_MODE, defaultApiMode);
if (conf.containsKey("spark.remote") || "connect".equalsIgnoreCase(apiMode)) {
isRemote = true;
}
} else {
this.isExample = isExample;
this.isSpecialCommand = true;
Expand Down Expand Up @@ -384,9 +390,8 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO
if (remoteStr != null) {
env.put("SPARK_REMOTE", remoteStr);
env.put("SPARK_CONNECT_MODE_ENABLED", "1");
} else if (conf.getOrDefault(
SparkLauncher.SPARK_API_MODE, "classic").toLowerCase(Locale.ROOT).equals("connect") &&
masterStr != null) {
} else if (isRemote) {
// If `removeStr` is not specified but isRemote is true, it means the api mode is connect.
env.put("SPARK_REMOTE", masterStr);
env.put("SPARK_CONNECT_MODE_ENABLED", "1");
}
Expand Down Expand Up @@ -528,11 +533,6 @@ protected boolean handle(String opt, String value) {
checkArgument(value != null, "Missing argument to %s", CONF);
String[] setConf = value.split("=", 2);
checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
if (setConf[0].equals("spark.remote") ||
(setConf[0].equals(SparkLauncher.SPARK_API_MODE) &&
setConf[1].toLowerCase(Locale.ROOT).equals("connect"))) {
isRemote = true;
}
conf.put(setConf[0], setConf[1]);
}
case CLASS -> {
Expand Down
8 changes: 7 additions & 1 deletion python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,13 @@ def getOrCreate(self) -> "SparkSession":
from pyspark.core.context import SparkContext

with self._lock:
is_api_mode_connect = opts.get("spark.api.mode", "classic").lower() == "connect"
default_api_mode = "classic"
if os.environ.get("SPARK_CONNECT_MODE", "") == "1":
default_api_mode = "connect"

is_api_mode_connect = (
opts.get("spark.api.mode", default_api_mode).lower() == "connect"
)

if (
"SPARK_CONNECT_MODE_ENABLED" in os.environ
Expand Down

0 comments on commit f601eb7

Please sign in to comment.