Skip to content

Commit

Permalink
[SPARK-30090][SHELL] Adapt Spark REPL to Scala 2.13
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is an attempt to adapt Spark REPL to Scala 2.13.

It is based on a [scala-2.13 branch](https://github.com/smarter/spark/tree/scala-2.13) made by smarter.

I had to set Scala version to 2.13 in some places, and to adapt some other modules, before I could start working on the REPL itself. These are separate commits on the branch that probably would be fixed beforehand, and thus dropped before the merge of this PR.

I couldn't find a way to run the initialization code with existing REPL classes in Scala 2.13.2, so I [modified REPL in Scala](karolchmist/scala@e9cc0dd) to make it work. With this modification I managed to run Spark Shell, along with the units tests passing, which is good news.

The bad news is that it requires an upstream change in Scala, which must be accepted first. I'd be happy to change it if someone points a way to do it differently. If not, I'd propose a PR in Scala to introduce `ILoop.internalReplAutorunCode`.

### Why are the changes needed?

REPL in Scala changed quite a lot, so current version of Spark REPL needed to be adapted.

### Does this PR introduce _any_ user-facing change?

In the previous version of `SparkILoop`, a lot of Scala's `ILoop` code was [overridden and duplicated](2bc7b75) to make the welcome message a bit more pleasant. In this PR, the message is in a bit different order, but it's still acceptable IMHO.

Before this PR:
```
20/05/15 15:32:39 WARN Utils: Your hostname, hermes resolves to a loopback address: 127.0.1.1; using 192.168.1.28 instead (on interface enp0s31f6)
20/05/15 15:32:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/05/15 15:32:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/05/15 15:32:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Spark context Web UI available at http://192.168.1.28:4041
Spark context available as 'sc' (master = local[*], app id = local-1589549565502).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.1-SNAPSHOT
      /_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.

scala>
```

With this PR:
```
20/05/15 15:32:15 WARN Utils: Your hostname, hermes resolves to a loopback address: 127.0.1.1; using 192.168.1.28 instead (on interface enp0s31f6)
20/05/15 15:32:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/05/15 15:32:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-SNAPSHOT
      /_/

Using Scala version 2.13.2-20200422-211118-706ef1b (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context Web UI available at http://192.168.1.28:4040
Spark context available as 'sc' (master = local[*], app id = local-1589549541259).
Spark session available as 'spark'.

scala>
```

It seems that currently the welcoming message is still an improvement from [the original ticket](https://issues.apache.org/jira/browse/SPARK-24785), albeit in a different order. As a bonus, some fragile code duplication was removed.

### How was this patch tested?

Existing tests pass in `repl`module. The REPL runs in a terminal and the following code executed correctly:

```
scala> spark.range(1000 * 1000 * 1000).count()
val res0: Long = 1000000000
```

Closes #28545 from karolchmist/scala-2.13-repl.

Authored-by: Karol Chmist <info+github@chmist.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
karolchmist authored and srowen committed Sep 12, 2020
1 parent bbbd907 commit 3be552c
Show file tree
Hide file tree
Showing 11 changed files with 741 additions and 89 deletions.
138 changes: 138 additions & 0 deletions repl/src/main/scala-2.13/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.repl

import java.io.File
import java.net.URI
import java.util.Locale

import scala.tools.nsc.GenericRunnerSettings

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.util.Utils

object Main extends Logging {

initializeLogIfNecessary(true)
Signaling.cancelOnInterrupt()

val conf = new SparkConf()
val rootDir =
conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")

var sparkContext: SparkContext = _
var sparkSession: SparkSession = _
// this is a public var because tests reset it.
var interp: SparkILoop = _

private var hasErrors = false
private var isShellSession = false

private def scalaOptionError(msg: String): Unit = {
hasErrors = true
// scalastyle:off println
Console.err.println(msg)
// scalastyle:on println
}

def main(args: Array[String]): Unit = {
isShellSession = true
doMain(args, new SparkILoop)
}

// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = Utils
.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x =>
if (x.startsWith("file:")) new File(new URI(x)).getPath else x
}
.mkString(File.pathSeparator)
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir",
s"${outputDir.getAbsolutePath}",
"-classpath",
jars
) ++ args.toList

val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)

if (!hasErrors) {
interp.run(settings) // Repl starts and goes in loop of R.E.P.L
Option(sparkContext).foreach(_.stop)
}
}

def createSparkSession(): SparkSession = {
try {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
// initialization in certain cases, there's an initialization order issue that prevents
// this from being set after SparkContext is instantiated.
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}

val builder = SparkSession.builder.config(conf)
if (conf
.get(CATALOG_IMPLEMENTATION.key, "hive")
.toLowerCase(Locale.ROOT) == "hive") {
if (SparkSession.hiveClassesArePresent) {
// In the case that the property is not set at all, builder's config
// does not have this value set to 'hive' yet. The original default
// behavior is that when there are hive classes, we use hive catalog.
sparkSession = builder.enableHiveSupport().getOrCreate()
logInfo("Created Spark session with Hive support")
} else {
// Need to change it back to 'in-memory' if no hive classes are found
// in the case that the property is set to hive in spark-defaults.conf
builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
} else {
// In the case that the property is set but not to 'hive', the internal
// default is 'in-memory'. So the sparkSession will use in-memory catalog.
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
sparkContext = sparkSession.sparkContext
sparkSession
} catch {
case e: Exception if isShellSession =>
logError("Failed to initialize Spark session.", e)
sys.exit(1)
}
}

}
149 changes: 149 additions & 0 deletions repl/src/main/scala-2.13/org/apache/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.repl

import java.io.{BufferedReader, PrintWriter}

// scalastyle:off println
import scala.Predef.{println => _, _}
import scala.tools.nsc.GenericRunnerSettings
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.shell.{ILoop, ShellConfig}
import scala.tools.nsc.util.stringFromStream
import scala.util.Properties.{javaVersion, javaVmName, versionString}
// scalastyle:on println

/**
* A Spark-specific interactive shell.
*/
class SparkILoop(in0: BufferedReader, out: PrintWriter)
extends ILoop(ShellConfig(new GenericRunnerSettings(_ => ())), in0, out) {
def this() = this(null, new PrintWriter(Console.out, true))

val initializationCommands: Seq[String] = Seq(
"""
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
if (proxyUrl != null) {
println(
s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
} else {
println(s"Spark Context Web UI is available at Spark Master Public URL")
}
} else {
_sc.uiWebUrl.foreach {
webUrl => println(s"Spark context Web UI available at ${webUrl}")
}
}
println("Spark context available as 'sc' " +
s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
println("Spark session available as 'spark'.")
_sc
}
""",
"import org.apache.spark.SparkContext._",
"import spark.implicits._",
"import spark.sql",
"import org.apache.spark.sql.functions._"
)

override protected def internalReplAutorunCode(): Seq[String] =
initializationCommands

def initializeSpark(): Unit = {
if (!intp.reporter.hasErrors) {
// `savingReplayStack` removes the commands from session history.
savingReplayStack {
initializationCommands.foreach(intp quietRun _)
}
} else {
throw new RuntimeException(
s"Scala $versionString interpreter encountered " +
"errors during initialization"
)
}
}

/** Print a welcome message */
override def printWelcome(): Unit = {
import org.apache.spark.SPARK_VERSION
echo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
versionString,
javaVmName,
javaVersion
)
echo(welcomeMsg)
echo("Type in expressions to have them evaluated.")
echo("Type :help for more information.")
}

/** Available commands */
override def commands: List[LoopCommand] = standardCommands

override def resetCommand(line: String): Unit = {
super.resetCommand(line)
initializeSpark()
echo(
"Note that after :reset, state of SparkSession and SparkContext is unchanged."
)
}

override def replay(): Unit = {
initializeSpark()
super.replay()
}
}

object SparkILoop {

/**
* Creates an interpreter loop with default settings and feeds
* the given code to it as input.
*/
def run(code: String, sets: Settings = new Settings): String = {
import java.io.{BufferedReader, StringReader, OutputStreamWriter}

stringFromStream { ostream =>
Console.withOut(ostream) {
val input = new BufferedReader(new StringReader(code))
val output = new PrintWriter(new OutputStreamWriter(ostream), true)
val repl = new SparkILoop(input, output)

if (sets.classpath.isDefault) {
sets.classpath.value = sys.props("java.class.path")
}
repl.run(sets)
}
}
}
def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)
}
58 changes: 58 additions & 0 deletions repl/src/test/scala-2.12/org/apache/spark/repl/Repl2Suite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.repl

import java.io._
import java.nio.file.Files

import scala.tools.nsc.interpreter.SimpleReader

import org.apache.log4j.{Level, LogManager, PropertyConfigurator}
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION

class Repl2Suite extends SparkFunSuite with BeforeAndAfterAll {
test("propagation of local properties") {
// A mock ILoop that doesn't install the SIGINT handler.
class ILoop(out: PrintWriter) extends SparkILoop(None, out) {
settings = new scala.tools.nsc.Settings
settings.usejavacp.value = true
org.apache.spark.repl.Main.interp = this
in = SimpleReader()
}

val out = new StringWriter()
Main.interp = new ILoop(new PrintWriter(out))
Main.sparkContext = new SparkContext("local", "repl-test")
Main.interp.createInterpreter()

Main.sparkContext.setLocalProperty("someKey", "someValue")

// Make sure the value we set in the caller to interpret is propagated in the thread that
// interprets the command.
Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
assert(out.toString.contains("someValue"))

Main.sparkContext.stop()
System.clearProperty("spark.driver.port")
}
}
Loading

0 comments on commit 3be552c

Please sign in to comment.