Skip to content

Commit

Permalink
Fixes race condition in CliSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Nov 2, 2014
1 parent 56f2c61 commit a70569c
Showing 1 changed file with 15 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@

package org.apache.spark.sql.hive.thriftserver

import java.io._

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io._
import java.util.concurrent.atomic.AtomicInteger

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand All @@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}

// AtomicInteger is needed because stderr and stdout of the forked process are handled in
// different threads.
val next = new AtomicInteger(0)
var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object

def captureOutput(source: String)(line: String) {
def captureOutput(source: String)(line: String): Unit = lock.synchronized {
buffer += s"$source> $line"
// If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
// If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
// If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
// If we haven't found all expected answers and another expected answer comes up...
if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
next += 1
// If all expected answers have been found...
if (next == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
}
}
Expand All @@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
|
|Executed query ${next.get()} "${queries(next.get())}",
|But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
|Executed query $next "${queries(next)}",
|But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
|${buffer.mkString("\n")}
|===========================
Expand Down

0 comments on commit a70569c

Please sign in to comment.