Skip to content

Commit

Permalink
[SPARK-45318][SHELL][TESTS] Merge test cases from `SingletonRepl2Suit…
Browse files Browse the repository at this point in the history
…e/Repl2Suite` back into `SingletonReplSuite/ReplSuite`

### What changes were proposed in this pull request?
This pr aims to merge test cases from `SingletonRepl2Suite/Repl2Suite` back into `SingletonReplSuite/ReplSuite` to reduce duplicate code.

### Why are the changes needed?
#28545 split the relevant test cases from `SingletonReplSuite/ReplSuite` into `SingletonRepl2Suite/Repl2Suite`, distinguishing different test versions of Scala 2.12 and Scala 2.13.

Currently, Spark 4.0 no longer supports Scala 2.12, so they can be merged back into the original files to reduce duplicate code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43104 from LuciferYang/SPARK-45318.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Sep 26, 2023
1 parent b1a0d67 commit 96b591a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 218 deletions.
46 changes: 0 additions & 46 deletions repl/src/test/scala/org/apache/spark/repl/Repl2Suite.scala

This file was deleted.

24 changes: 23 additions & 1 deletion repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.Files
import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.{Logger, LoggerContext}

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
Expand Down Expand Up @@ -398,4 +398,26 @@ class ReplSuite extends SparkFunSuite {
assertContains(infoLogMessage2, out)
assertContains(debugLogMessage1, out)
}

test("propagation of local properties") {
// A mock ILoop that doesn't install the SIGINT handler.
class ILoop(out: PrintWriter) extends SparkILoop(null, out)

val out = new StringWriter()
Main.interp = new ILoop(new PrintWriter(out))
Main.sparkContext = new SparkContext("local", "repl-test")
val settings = new scala.tools.nsc.Settings
settings.usejavacp.value = true
Main.interp.createInterpreter(settings)

Main.sparkContext.setLocalProperty("someKey", "someValue")

// Make sure the value we set in the caller to interpret is propagated in the thread that
// interprets the command.
Main.interp.interpret("org.apache.spark.repl.Main.sparkContext.getLocalProperty(\"someKey\")")
assert(out.toString.contains("someValue"))

Main.sparkContext.stop()
System.clearProperty("spark.driver.port")
}
}
171 changes: 0 additions & 171 deletions repl/src/test/scala/org/apache/spark/repl/SingletonRepl2Suite.scala

This file was deleted.

65 changes: 65 additions & 0 deletions repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -412,4 +412,69 @@ class SingletonReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}

test("SPARK-31399: should clone+clean line object w/ non-serializable state in ClosureCleaner") {
// Test ClosureCleaner when a closure captures the enclosing `this` REPL line object, and that
// object contains an unused non-serializable field.
// Specifically, the closure in this test case contains a directly nested closure, and the
// capture is triggered by the inner closure.
// `ns` should be nulled out, but `topLevelValue` should stay intact.

// Can't use :paste mode because PipedOutputStream/PipedInputStream doesn't work well with the
// EOT control character (i.e. Ctrl+D).
// Just write things on a single line to emulate :paste mode.

// NOTE: in order for this test case to trigger the intended scenario, the following three
// variables need to be in the same "input", which will make the REPL pack them into the
// same REPL line object:
// - ns: a non-serializable state, not accessed by the closure;
// - topLevelValue: a serializable state, accessed by the closure;
// - closure: the starting closure, captures the enclosing REPL line object.
val output = runInterpreter(
"""
|class NotSerializableClass(val x: Int)
|val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure =
|(j: Int) => {
| (1 to j).flatMap { x =>
| (1 to x).map { y => y + topLevelValue }
| }
|}
|val r = sc.parallelize(0 to 2).map(closure).collect
""".stripMargin)
// assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " +
// "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output)
assertContains("r: Array[IndexedSeq[String]] = " +
"Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output)
assertDoesNotContain("Exception", output)
}

test("SPARK-31399: ClosureCleaner should discover indirectly nested closure in inner class") {
// Similar to the previous test case, but with indirect closure nesting instead.
// There's still nested closures involved, but the inner closure is indirectly nested in the
// outer closure, with a level of inner class in between them.
// This changes how the inner closure references/captures the outer closure/enclosing `this`
// REPL line object, and covers a different code path in inner closure discovery.

// `ns` should be nulled out, but `topLevelValue` should stay intact.

val output = runInterpreter(
"""
|class NotSerializableClass(val x: Int)
|val ns = new NotSerializableClass(42); val topLevelValue = "someValue"; val closure =
|(j: Int) => {
| class InnerFoo {
| val innerClosure = (x: Int) => (1 to x).map { y => y + topLevelValue }
| }
| val innerFoo = new InnerFoo
| (1 to j).flatMap(innerFoo.innerClosure)
|}
|val r = sc.parallelize(0 to 2).map(closure).collect
""".stripMargin)
// assertContains("r: Array[scala.collection.immutable.IndexedSeq[String]] = " +
// "Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output)
assertContains("r: Array[IndexedSeq[String]] = " +
"Array(Vector(), Vector(1someValue), Vector(1someValue, 1someValue, 2someValue))", output)
assertDoesNotContain("Array(Vector(), Vector(1null), Vector(1null, 1null, 2null)", output)
assertDoesNotContain("Exception", output)
}
}

0 comments on commit 96b591a

Please sign in to comment.