Skip to content

Commit

Permalink
fix merge issues
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Dec 21, 2017
1 parent 28ee8a9 commit 89e1c6d
Showing 1 changed file with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,13 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
assertJobs(statusStore.execution(executionId), failed = Seq(0))
}

sqlStoreTest("handle one execution with multiple jobs") { (store, bus) =>
test("handle one execution with multiple jobs") {
val statusStore = createStatusStore()
val listener = statusStore.listener.get

val executionId = 0
val df = createTestDataFrame
bus.postToAll(SparkListenerSQLExecutionStart(
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
Expand All @@ -398,16 +401,16 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
def twoStageJob(jobId: Int): Unit = {
val stages = Seq(stageId, stageId + 1).map { id => createStageInfo(id, 0)}
stageId += 2
bus.postToAll(SparkListenerJobStart(
listener.onJobStart(SparkListenerJobStart(
jobId = jobId,
time = System.currentTimeMillis(),
stageInfos = stages,
createProperties(executionId)))
stages.foreach { s =>
bus.postToAll(SparkListenerStageSubmitted(s))
bus.postToAll(SparkListenerStageCompleted(s))
listener.onStageSubmitted(SparkListenerStageSubmitted(s))
listener.onStageCompleted(SparkListenerStageCompleted(s))
}
bus.postToAll(SparkListenerJobEnd(
listener.onJobEnd(SparkListenerJobEnd(
jobId = jobId,
time = System.currentTimeMillis(),
JobSucceeded
Expand All @@ -416,11 +419,11 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
// submit two jobs with the same executionId
twoStageJob(0)
twoStageJob(1)
bus.postToAll(SparkListenerSQLExecutionEnd(
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))

assertJobs(store.execution(0), completed = 0 to 1)
assert(store.execution(0).get.stages === (0 to 3).toSet)
assertJobs(statusStore.execution(0), completed = 0 to 1)
assert(statusStore.execution(0).get.stages === (0 to 3).toSet)
}

test("SPARK-11126: no memory leak when running non SQL jobs") {
Expand Down

0 comments on commit 89e1c6d

Please sign in to comment.