From a4f42e976dc607ca92becce5ee90ea40c9b9fd81 Mon Sep 17 00:00:00 2001 From: Karol Chmist Date: Fri, 24 Jul 2020 09:15:15 +0200 Subject: [PATCH] Adapt REPL to Scala 2.13 --- .../org/apache/spark/repl/Main.scala | 0 .../org/apache/spark/repl/SparkILoop.scala | 0 .../org/apache/spark/repl/Main.scala | 138 ++++++++++++++ .../org/apache/spark/repl/SparkILoop.scala | 149 +++++++++++++++ .../org/apache/spark/repl/Repl2Suite.scala | 58 ++++++ .../spark/repl/SingletonRepl2Suite.scala | 173 +++++++++++++++++ .../org/apache/spark/repl/Repl2Suite.scala | 53 ++++++ .../spark/repl/SingletonRepl2Suite.scala | 174 ++++++++++++++++++ .../org/apache/spark/repl/ReplSuite.scala | 27 --- .../spark/repl/SingletonReplSuite.scala | 61 ------ 10 files changed, 745 insertions(+), 88 deletions(-) rename repl/src/main/{scala => scala-2.12}/org/apache/spark/repl/Main.scala (100%) rename repl/src/main/{scala => scala-2.12}/org/apache/spark/repl/SparkILoop.scala (100%) create mode 100644 repl/src/main/scala-2.13/org/apache/spark/repl/Main.scala create mode 100644 repl/src/main/scala-2.13/org/apache/spark/repl/SparkILoop.scala create mode 100644 repl/src/test/scala-2.12/org/apache/spark/repl/Repl2Suite.scala create mode 100644 repl/src/test/scala-2.12/org/apache/spark/repl/SingletonRepl2Suite.scala create mode 100644 repl/src/test/scala-2.13/org/apache/spark/repl/Repl2Suite.scala create mode 100644 repl/src/test/scala-2.13/org/apache/spark/repl/SingletonRepl2Suite.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala-2.12/org/apache/spark/repl/Main.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/Main.scala rename to repl/src/main/scala-2.12/org/apache/spark/repl/Main.scala diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala-2.12/org/apache/spark/repl/SparkILoop.scala similarity index 100% rename from repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala rename to repl/src/main/scala-2.12/org/apache/spark/repl/SparkILoop.scala diff --git a/repl/src/main/scala-2.13/org/apache/spark/repl/Main.scala b/repl/src/main/scala-2.13/org/apache/spark/repl/Main.scala new file mode 100644 index 0000000000000..95115934ed1d6 --- /dev/null +++ b/repl/src/main/scala-2.13/org/apache/spark/repl/Main.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io.File +import java.net.URI +import java.util.Locale + +import scala.tools.nsc.GenericRunnerSettings + +import org.apache.spark._ +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.util.Utils + +object Main extends Logging { + + initializeLogIfNecessary(true) + Signaling.cancelOnInterrupt() + + val conf = new SparkConf() + val rootDir = + conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) + val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") + + var sparkContext: SparkContext = _ + var sparkSession: SparkSession = _ + // this is a public var because tests reset it. + var interp: SparkILoop = _ + + private var hasErrors = false + private var isShellSession = false + + private def scalaOptionError(msg: String): Unit = { + hasErrors = true + // scalastyle:off println + Console.err.println(msg) + // scalastyle:on println + } + + def main(args: Array[String]): Unit = { + isShellSession = true + doMain(args, new SparkILoop) + } + + // Visible for testing + private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { + interp = _interp + val jars = Utils + .getLocalUserJarsForShell(conf) + // Remove file:///, file:// or file:/ scheme if exists for each jar + .map { x => + if (x.startsWith("file:")) new File(new URI(x)).getPath else x + } + .mkString(File.pathSeparator) + val interpArguments = List( + "-Yrepl-class-based", + "-Yrepl-outdir", + s"${outputDir.getAbsolutePath}", + "-classpath", + jars + ) ++ args.toList + + val settings = new GenericRunnerSettings(scalaOptionError) + settings.processArguments(interpArguments, true) + + if (!hasErrors) { + interp.run(settings) // Repl starts and goes in loop of R.E.P.L + Option(sparkContext).foreach(_.stop) + } + } + + def createSparkSession(): SparkSession = { + try { + val execUri = System.getenv("SPARK_EXECUTOR_URI") + conf.setIfMissing("spark.app.name", "Spark shell") + // SparkContext will detect this configuration and register it with the RpcEnv's + // file server, setting spark.repl.class.uri to the actual URI for executors to + // use. This is sort of ugly but since executors are started as part of SparkContext + // initialization in certain cases, there's an initialization order issue that prevents + // this from being set after SparkContext is instantiated. + conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()) + if (execUri != null) { + conf.set("spark.executor.uri", execUri) + } + if (System.getenv("SPARK_HOME") != null) { + conf.setSparkHome(System.getenv("SPARK_HOME")) + } + + val builder = SparkSession.builder.config(conf) + if (conf + .get(CATALOG_IMPLEMENTATION.key, "hive") + .toLowerCase(Locale.ROOT) == "hive") { + if (SparkSession.hiveClassesArePresent) { + // In the case that the property is not set at all, builder's config + // does not have this value set to 'hive' yet. The original default + // behavior is that when there are hive classes, we use hive catalog. + sparkSession = builder.enableHiveSupport().getOrCreate() + logInfo("Created Spark session with Hive support") + } else { + // Need to change it back to 'in-memory' if no hive classes are found + // in the case that the property is set to hive in spark-defaults.conf + builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") + sparkSession = builder.getOrCreate() + logInfo("Created Spark session") + } + } else { + // In the case that the property is set but not to 'hive', the internal + // default is 'in-memory'. So the sparkSession will use in-memory catalog. + sparkSession = builder.getOrCreate() + logInfo("Created Spark session") + } + sparkContext = sparkSession.sparkContext + sparkSession + } catch { + case e: Exception if isShellSession => + logError("Failed to initialize Spark session.", e) + sys.exit(1) + } + } + +} diff --git a/repl/src/main/scala-2.13/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala-2.13/org/apache/spark/repl/SparkILoop.scala new file mode 100644 index 0000000000000..861cf5a740ce1 --- /dev/null +++ b/repl/src/main/scala-2.13/org/apache/spark/repl/SparkILoop.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io.{BufferedReader, PrintWriter} + +// scalastyle:off println +import scala.Predef.{println => _, _} +import scala.tools.nsc.GenericRunnerSettings +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.shell.{ILoop, ShellConfig} +import scala.tools.nsc.util.stringFromStream +import scala.util.Properties.{javaVersion, javaVmName, versionString} +// scalastyle:on println + +/** + * A Spark-specific interactive shell. + */ +class SparkILoop(in0: BufferedReader, out: PrintWriter) + extends ILoop(ShellConfig(new GenericRunnerSettings(_ => ())), in0, out) { + def this() = this(null, new PrintWriter(Console.out, true)) + + val initializationCommands: Seq[String] = Seq( + """ + @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { + org.apache.spark.repl.Main.sparkSession + } else { + org.apache.spark.repl.Main.createSparkSession() + } + @transient val sc = { + val _sc = spark.sparkContext + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println( + s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } + println("Spark context available as 'sc' " + + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") + println("Spark session available as 'spark'.") + _sc + } + """, + "import org.apache.spark.SparkContext._", + "import spark.implicits._", + "import spark.sql", + "import org.apache.spark.sql.functions._" + ) + + override protected def internalReplAutorunCode(): Seq[String] = + initializationCommands + + def initializeSpark(): Unit = { + if (!intp.reporter.hasErrors) { + // `savingReplayStack` removes the commands from session history. + savingReplayStack { + initializationCommands.foreach(intp quietRun _) + } + } else { + throw new RuntimeException( + s"Scala $versionString interpreter encountered " + + "errors during initialization" + ) + } + } + + /** Print a welcome message */ + override def printWelcome(): Unit = { + import org.apache.spark.SPARK_VERSION + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ + """.format(SPARK_VERSION)) + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, + javaVmName, + javaVersion + ) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + /** Available commands */ + override def commands: List[LoopCommand] = standardCommands + + override def resetCommand(line: String): Unit = { + super.resetCommand(line) + initializeSpark() + echo( + "Note that after :reset, state of SparkSession and SparkContext is unchanged." + ) + } + + override def replay(): Unit = { + initializeSpark() + super.replay() + } +} + +object SparkILoop { + + /** + * Creates an interpreter loop with default settings and feeds + * the given code to it as input. + */ + def run(code: String, sets: Settings = new Settings): String = { + import java.io.{BufferedReader, StringReader, OutputStreamWriter} + + stringFromStream { ostream => + Console.withOut(ostream) { + val input = new BufferedReader(new StringReader(code)) + val output = new PrintWriter(new OutputStreamWriter(ostream), true) + val repl = new SparkILoop(input, output) + + if (sets.classpath.isDefault) { + sets.classpath.value = sys.props("java.class.path") + } + repl.run(sets) + } + } + } + def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString) +} diff --git a/repl/src/test/scala-2.12/org/apache/spark/repl/Repl2Suite.scala b/repl/src/test/scala-2.12/org/apache/spark/repl/Repl2Suite.scala new file mode 100644 index 0000000000000..4ffa8beaf4740 --- /dev/null +++ b/repl/src/test/scala-2.12/org/apache/spark/repl/Repl2Suite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io._ +import java.nio.file.Files + +import scala.tools.nsc.interpreter.SimpleReader + +import org.apache.log4j.{Level, LogManager, PropertyConfigurator} +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class Repl2Suite extends SparkFunSuite with BeforeAndAfterAll { + test("propagation of local properties") { + // A mock ILoop that doesn't install the SIGINT handler. + class ILoop(out: PrintWriter) extends SparkILoop(None, out) { + settings = new scala.tools.nsc.Settings + settings.usejavacp.value = true + org.apache.spark.repl.Main.interp = this + in = SimpleReader() + } + + val out = new StringWriter() + Main.interp = new ILoop(new PrintWriter(out)) + Main.sparkContext = new SparkContext("local", "repl-test") + Main.interp.createInterpreter() + + Main.sparkContext.setLocalProperty("someKey", "someValue") + + // Make sure the value we set in the caller to interpret is propagated in the thread that + // interprets the command. + Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")") + assert(out.toString.contains("someValue")) + + Main.sparkContext.stop() + System.clearProperty("spark.driver.port") + } +} diff --git a/repl/src/test/scala-2.12/org/apache/spark/repl/SingletonRepl2Suite.scala b/repl/src/test/scala-2.12/org/apache/spark/repl/SingletonRepl2Suite.scala new file mode 100644 index 0000000000000..b165e97fc9dcc --- /dev/null +++ b/repl/src/test/scala-2.12/org/apache/spark/repl/SingletonRepl2Suite.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io._ + +import org.apache.commons.text.StringEscapeUtils +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * A special test suite for REPL that all test cases share one REPL instance. + */ +class SingletonRepl2Suite extends SparkFunSuite { + private val out = new StringWriter() + private val in = new PipedOutputStream() + private var thread: Thread = _ + + private val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath" + private val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) + + override def beforeAll(): Unit = { + super.beforeAll() + + val classpath = System.getProperty("java.class.path") + System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) + + Main.conf.set("spark.master", "local-cluster[2,1,1024]") + val interp = new SparkILoop( + new BufferedReader(new InputStreamReader(new PipedInputStream(in))), + new PrintWriter(out)) + + // Forces to create new SparkContext + Main.sparkContext = null + Main.sparkSession = null + + // Starts a new thread to run the REPL interpreter, so that we won't block. + thread = new Thread(() => Main.doMain(Array("-classpath", classpath), interp)) + thread.setDaemon(true) + thread.start() + + waitUntil(() => out.toString.contains("Type :help for more information")) + } + + override def afterAll(): Unit = { + in.close() + thread.join() + if (oldExecutorClasspath != null) { + System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) + } else { + System.clearProperty(CONF_EXECUTOR_CLASSPATH) + } + super.afterAll() + } + + private def waitUntil(cond: () => Boolean): Unit = { + import scala.concurrent.duration._ + import org.scalatest.concurrent.Eventually._ + + eventually(timeout(50.seconds), interval(500.millis)) { + assert(cond(), "current output: " + out.toString) + } + } + + /** + * Run the given commands string in a globally shared interpreter instance. Note that the given + * commands should not crash the interpreter, to not affect other test cases. + */ + def runInterpreter(input: String): String = { + val currentOffset = out.getBuffer.length() + // append a special statement to the end of the given code, so that we can know what's + // the final output of this code snippet and rely on it to wait until the output is ready. + val timestamp = System.currentTimeMillis() + in.write((input + s"\nval _result_$timestamp = 1\n").getBytes) + in.flush() + val stopMessage = s"_result_$timestamp: Int = 1" + waitUntil(() => out.getBuffer.substring(currentOffset).contains(stopMessage)) + out.getBuffer.substring(currentOffset) + } + + def assertContains(message: String, output: String): Unit = { + val isContain = output.contains(message) + assert(isContain, + "Interpreter output did not contain '" + message + "':\n" + output) + } + + def assertDoesNotContain(message: String, output: String): Unit = { + val isContain = output.contains(message) + assert(!isContain, + "Interpreter output contained '" + message + "':\n" + output) + } + + test("SPARK-31399: should clone+clean line object w/ non-serializable state in ClosureCleaner") { + // Test ClosureCleaner when a closure captures the enclosing `this` REPL line object, and that + // object contains an unused non-serializable field. + // Specifically, the closure in this test case contains a directly nested closure, and the + // capture is triggered by the inner closure. + // `ns` should be nulled out, but `topLevelValue` should stay intact. + + // Can't use :paste mode because PipedOutputStream/PipedInputStream doesn't work well with the + // EOT control character (i.e. Ctrl+D). + // Just write things on a single line to emulate :paste mode. + + // NOTE: in order for this test case to trigger the intended scenario, the following three + // variables need to be in the same "input", which will make the REPL pack them into the + // same REPL line object: + // - ns: a non-serializable state, not accessed by the closure; + // - topLevelValue: a serializable state, accessed by the closure; + // - closure: the starting closure, captures the enclosing REPL line object. + val output = runInterpreter( + """ + |class NotSerializableClass(val x: Int) + |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure = + |(j: Int) => { + | (1 to j).flatMap { x => + | (1 to x).map { y => y + topLevelValue } + | } + |} + |val r = sc.parallelize(0 to 2).map(closure).collect + """.stripMargin) + assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " + + "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) +// assertContains("r: Array[IndexedSeq[String]] = " + +// "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) + assertDoesNotContain("Exception", output) + } + + test("SPARK-31399: ClosureCleaner should discover indirectly nested closure in inner class") { + // Similar to the previous test case, but with indirect closure nesting instead. + // There's still nested closures involved, but the inner closure is indirectly nested in the + // outer closure, with a level of inner class in between them. + // This changes how the inner closure references/captures the outer closure/enclosing `this` + // REPL line object, and covers a different code path in inner closure discovery. + + // `ns` should be nulled out, but `topLevelValue` should stay intact. + + val output = runInterpreter( + """ + |class NotSerializableClass(val x: Int) + |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure = + |(j: Int) => { + | class InnerFoo { + | val innerClosure = (x: Int) => (1 to x).map { y => y + topLevelValue } + | } + | val innerFoo = new InnerFoo + | (1 to j).flatMap(innerFoo.innerClosure) + |} + |val r = sc.parallelize(0 to 2).map(closure).collect + """.stripMargin) + assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " + + "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) +// assertContains("r: Array[IndexedSeq[String]] = " + +// "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) + assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 2null)", output) + assertDoesNotContain("Exception", output) + } + + } diff --git a/repl/src/test/scala-2.13/org/apache/spark/repl/Repl2Suite.scala b/repl/src/test/scala-2.13/org/apache/spark/repl/Repl2Suite.scala new file mode 100644 index 0000000000000..a93284a129e28 --- /dev/null +++ b/repl/src/test/scala-2.13/org/apache/spark/repl/Repl2Suite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io._ +import java.nio.file.Files + +import org.apache.log4j.{Level, LogManager, PropertyConfigurator} +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.{SparkContext, SparkFunSuite} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION + +class Repl2Suite extends SparkFunSuite with BeforeAndAfterAll { + test("propagation of local properties") { + // A mock ILoop that doesn't install the SIGINT handler. + class ILoop(out: PrintWriter) extends SparkILoop(null, out) + + val out = new StringWriter() + Main.interp = new ILoop(new PrintWriter(out)) + Main.sparkContext = new SparkContext("local", "repl-test") + val settings = new scala.tools.nsc.Settings + settings.usejavacp.value = true + Main.interp.createInterpreter(settings) + + Main.sparkContext.setLocalProperty("someKey", "someValue") + + // Make sure the value we set in the caller to interpret is propagated in the thread that + // interprets the command. + Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")") + assert(out.toString.contains("someValue")) + + Main.sparkContext.stop() + System.clearProperty("spark.driver.port") + } +} diff --git a/repl/src/test/scala-2.13/org/apache/spark/repl/SingletonRepl2Suite.scala b/repl/src/test/scala-2.13/org/apache/spark/repl/SingletonRepl2Suite.scala new file mode 100644 index 0000000000000..8e304bc812ba4 --- /dev/null +++ b/repl/src/test/scala-2.13/org/apache/spark/repl/SingletonRepl2Suite.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io._ + +import org.apache.commons.text.StringEscapeUtils + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.Utils + +/** + * A special test suite for REPL that all test cases share one REPL instance. + */ +class SingletonRepl2Suite extends SparkFunSuite { + + private val out = new StringWriter() + private val in = new PipedOutputStream() + private var thread: Thread = _ + + private val CONF_EXECUTOR_CLASSPATH = "spark.executor.extraClassPath" + private val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) + + override def beforeAll(): Unit = { + super.beforeAll() + + val classpath = System.getProperty("java.class.path") + System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) + + Main.conf.set("spark.master", "local-cluster[2,1,1024]") + val interp = new SparkILoop( + new BufferedReader(new InputStreamReader(new PipedInputStream(in))), + new PrintWriter(out)) + + // Forces to create new SparkContext + Main.sparkContext = null + Main.sparkSession = null + + // Starts a new thread to run the REPL interpreter, so that we won't block. + thread = new Thread(() => Main.doMain(Array("-classpath", classpath), interp)) + thread.setDaemon(true) + thread.start() + + waitUntil(() => out.toString.contains("Type :help for more information")) + } + + override def afterAll(): Unit = { + in.close() + thread.join() + if (oldExecutorClasspath != null) { + System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) + } else { + System.clearProperty(CONF_EXECUTOR_CLASSPATH) + } + super.afterAll() + } + + private def waitUntil(cond: () => Boolean): Unit = { + import scala.concurrent.duration._ + import org.scalatest.concurrent.Eventually._ + + eventually(timeout(50.seconds), interval(500.millis)) { + assert(cond(), "current output: " + out.toString) + } + } + + /** + * Run the given commands string in a globally shared interpreter instance. Note that the given + * commands should not crash the interpreter, to not affect other test cases. + */ + def runInterpreter(input: String): String = { + val currentOffset = out.getBuffer.length() + // append a special statement to the end of the given code, so that we can know what's + // the final output of this code snippet and rely on it to wait until the output is ready. + val timestamp = System.currentTimeMillis() + in.write((input + s"\nval _result_$timestamp = 1\n").getBytes) + in.flush() + val stopMessage = s"_result_$timestamp: Int = 1" + waitUntil(() => out.getBuffer.substring(currentOffset).contains(stopMessage)) + out.getBuffer.substring(currentOffset) + } + + def assertContains(message: String, output: String): Unit = { + val isContain = output.contains(message) + assert(isContain, + "Interpreter output did not contain '" + message + "':\n" + output) + } + + def assertDoesNotContain(message: String, output: String): Unit = { + val isContain = output.contains(message) + assert(!isContain, + "Interpreter output contained '" + message + "':\n" + output) + } + + test("SPARK-31399: should clone+clean line object w/ non-serializable state in ClosureCleaner") { + // Test ClosureCleaner when a closure captures the enclosing `this` REPL line object, and that + // object contains an unused non-serializable field. + // Specifically, the closure in this test case contains a directly nested closure, and the + // capture is triggered by the inner closure. + // `ns` should be nulled out, but `topLevelValue` should stay intact. + + // Can't use :paste mode because PipedOutputStream/PipedInputStream doesn't work well with the + // EOT control character (i.e. Ctrl+D). + // Just write things on a single line to emulate :paste mode. + + // NOTE: in order for this test case to trigger the intended scenario, the following three + // variables need to be in the same "input", which will make the REPL pack them into the + // same REPL line object: + // - ns: a non-serializable state, not accessed by the closure; + // - topLevelValue: a serializable state, accessed by the closure; + // - closure: the starting closure, captures the enclosing REPL line object. + val output = runInterpreter( + """ + |class NotSerializableClass(val x: Int) + |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure = + |(j: Int) => { + | (1 to j).flatMap { x => + | (1 to x).map { y => y + topLevelValue } + | } + |} + |val r = sc.parallelize(0 to 2).map(closure).collect + """.stripMargin) +// assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " + +// "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) + assertContains("r: Array[IndexedSeq[String]] = " + + "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) + assertDoesNotContain("Exception", output) + } + + test("SPARK-31399: ClosureCleaner should discover indirectly nested closure in inner class") { + // Similar to the previous test case, but with indirect closure nesting instead. + // There's still nested closures involved, but the inner closure is indirectly nested in the + // outer closure, with a level of inner class in between them. + // This changes how the inner closure references/captures the outer closure/enclosing `this` + // REPL line object, and covers a different code path in inner closure discovery. + + // `ns` should be nulled out, but `topLevelValue` should stay intact. + + val output = runInterpreter( + """ + |class NotSerializableClass(val x: Int) + |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure = + |(j: Int) => { + | class InnerFoo { + | val innerClosure = (x: Int) => (1 to x).map { y => y + topLevelValue } + | } + | val innerFoo = new InnerFoo + | (1 to j).flatMap(innerFoo.innerClosure) + |} + |val r = sc.parallelize(0 to 2).map(closure).collect + """.stripMargin) +// assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " + +// "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) + assertContains("r: Array[IndexedSeq[String]] = " + + "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) + assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 2null)", output) + assertDoesNotContain("Exception", output) + } +} diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 1e92b36c336d8..95d908cec5de0 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -20,8 +20,6 @@ package org.apache.spark.repl import java.io._ import java.nio.file.Files -import scala.tools.nsc.interpreter.SimpleReader - import org.apache.log4j.{Level, LogManager, PropertyConfigurator} import org.scalatest.BeforeAndAfterAll @@ -86,31 +84,6 @@ class ReplSuite extends SparkFunSuite with BeforeAndAfterAll { "Interpreter output contained '" + message + "':\n" + output) } - test("propagation of local properties") { - // A mock ILoop that doesn't install the SIGINT handler. - class ILoop(out: PrintWriter) extends SparkILoop(None, out) { - settings = new scala.tools.nsc.Settings - settings.usejavacp.value = true - org.apache.spark.repl.Main.interp = this - in = SimpleReader() - } - - val out = new StringWriter() - Main.interp = new ILoop(new PrintWriter(out)) - Main.sparkContext = new SparkContext("local", "repl-test") - Main.interp.createInterpreter() - - Main.sparkContext.setLocalProperty("someKey", "someValue") - - // Make sure the value we set in the caller to interpret is propagated in the thread that - // interprets the command. - Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")") - assert(out.toString.contains("someValue")) - - Main.sparkContext.stop() - System.clearProperty("spark.driver.port") - } - test("SPARK-15236: use Hive catalog") { // turn on the INFO log so that it is possible the code will dump INFO // entry for using "HiveMetastore" diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala index e11a54bc88070..4795306692f7a 100644 --- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala @@ -380,67 +380,6 @@ class SingletonReplSuite extends SparkFunSuite { assertDoesNotContain("Exception", output) } - test("SPARK-31399: should clone+clean line object w/ non-serializable state in ClosureCleaner") { - // Test ClosureCleaner when a closure captures the enclosing `this` REPL line object, and that - // object contains an unused non-serializable field. - // Specifically, the closure in this test case contains a directly nested closure, and the - // capture is triggered by the inner closure. - // `ns` should be nulled out, but `topLevelValue` should stay intact. - - // Can't use :paste mode because PipedOutputStream/PipedInputStream doesn't work well with the - // EOT control character (i.e. Ctrl+D). - // Just write things on a single line to emulate :paste mode. - - // NOTE: in order for this test case to trigger the intended scenario, the following three - // variables need to be in the same "input", which will make the REPL pack them into the - // same REPL line object: - // - ns: a non-serializable state, not accessed by the closure; - // - topLevelValue: a serializable state, accessed by the closure; - // - closure: the starting closure, captures the enclosing REPL line object. - val output = runInterpreter( - """ - |class NotSerializableClass(val x: Int) - |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure = - |(j: Int) => { - | (1 to j).flatMap { x => - | (1 to x).map { y => y + topLevelValue } - | } - |} - |val r = sc.parallelize(0 to 2).map(closure).collect - """.stripMargin) - assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " + - "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) - assertDoesNotContain("Exception", output) - } - - test("SPARK-31399: ClosureCleaner should discover indirectly nested closure in inner class") { - // Similar to the previous test case, but with indirect closure nesting instead. - // There's still nested closures involved, but the inner closure is indirectly nested in the - // outer closure, with a level of inner class in between them. - // This changes how the inner closure references/captures the outer closure/enclosing `this` - // REPL line object, and covers a different code path in inner closure discovery. - - // `ns` should be nulled out, but `topLevelValue` should stay intact. - - val output = runInterpreter( - """ - |class NotSerializableClass(val x: Int) - |val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure = - |(j: Int) => { - | class InnerFoo { - | val innerClosure = (x: Int) => (1 to x).map { y => y + topLevelValue } - | } - | val innerFoo = new InnerFoo - | (1 to j).flatMap(innerFoo.innerClosure) - |} - |val r = sc.parallelize(0 to 2).map(closure).collect - """.stripMargin) - assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " + - "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output) - assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 2null)", output) - assertDoesNotContain("Exception", output) - } - test("newProductSeqEncoder with REPL defined class") { val output = runInterpreter( """