Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL] Fixes race condition in CliSuite #3060

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fixes race condition in CliSuite
  • Loading branch information
liancheng committed Nov 2, 2014
commit a70569cc0a0f89433e61224732a3fe09aa2669f7
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@

package org.apache.spark.sql.hive.thriftserver

import java.io._

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io._
import java.util.concurrent.atomic.AtomicInteger

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand All @@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}

// AtomicInteger is needed because stderr and stdout of the forked process are handled in
// different threads.
val next = new AtomicInteger(0)
var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object

def captureOutput(source: String)(line: String) {
def captureOutput(source: String)(line: String): Unit = lock.synchronized {
buffer += s"$source> $line"
// If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
// If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
// If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
// If we haven't found all expected answers and another expected answer comes up...
if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
next += 1
// If all expected answers have been found...
if (next == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
}
}
Expand All @@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
|
|Executed query ${next.get()} "${queries(next.get())}",
|But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
|Executed query $next "${queries(next)}",
|But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
|${buffer.mkString("\n")}
|===========================
Expand Down