Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed May 9, 2017
1 parent 3d14998 commit 7dde745
Showing 1 changed file with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class SingletonReplSuite extends SparkFunSuite {
val classpath = paths.map(new File(_).getAbsolutePath).mkString(File.pathSeparator)

System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
Main.conf.set("spark.master", "local-cluster[2,4,4096]")
Main.conf.set("spark.master", "local-cluster[2,1,1024]")
val interp = new SparkILoop(
new BufferedReader(new InputStreamReader(new PipedInputStream(in))),
new PrintWriter(out))
Expand All @@ -68,6 +68,7 @@ class SingletonReplSuite extends SparkFunSuite {
thread = new Thread(new Runnable {
override def run(): Unit = Main.doMain(Array("-classpath", classpath), interp)
})
thread.setDaemon(true)
thread.start()

waitUntil(() => out.toString.contains("Type :help for more information"))
Expand All @@ -85,23 +86,26 @@ class SingletonReplSuite extends SparkFunSuite {
}

private def waitUntil(cond: () => Boolean): Unit = {
var i = 0
while (i < 100 && !cond()) {
Thread.sleep(500)
i += 1
}
if (i == 100) {
throw new IllegalStateException("timeout after 50 seconds, current output: " + out.toString)
import scala.concurrent.duration._
import org.scalatest.concurrent.Eventually._

eventually(timeout(50.seconds), interval(500.millis)) {
assert(cond(), "current output: " + out.toString)
}
}

/**
* Run the given commands string in a globally shared interpreter instance. Note that the given
* commands should not crash the interpreter, to not affect other test cases.
*/
def runInterpreter(input: String): String = {
val currentOffset = out.getBuffer.length()
// append a `val _result = 1` statement to the end of the given code, so that we can know what's
// append a special statement to the end of the given code, so that we can know what's
// the final output of this code snippet and rely on it to wait until the output is ready.
in.write((input + s"\nval _result = 1\n").getBytes)
val timestamp = System.currentTimeMillis()
in.write((input + s"\nval _result_$timestamp = 1\n").getBytes)
in.flush()
val stopMessage = s"_result: Int = 1"
val stopMessage = s"_result_$timestamp: Int = 1"
waitUntil(() => out.getBuffer.substring(currentOffset).contains(stopMessage))
out.getBuffer.substring(currentOffset)
}
Expand Down

0 comments on commit 7dde745

Please sign in to comment.