Skip to content

Commit

Permalink
[HOTFIX][Streaming] Handle port collisions in flume polling test
Browse files Browse the repository at this point in the history
This is failing my tests in apache#1777. @tdas

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#1803 from andrewor14/fix-flaky-streaming-test and squashes the following commits:

ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions
54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test
664095c [Andrew Or] Tone down bind exception message
af3ddc9 [Andrew Or] Handle port collisions in flume polling test
  • Loading branch information
andrewor14 authored and conviva-zz committed Sep 4, 2014
1 parent 2f33da5 commit 2d31b6a
Showing 1 changed file with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
import org.apache.spark.streaming.flume.sink._
import org.apache.spark.util.Utils

class FlumePollingStreamSuite extends TestSuiteBase {

Expand All @@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
val eventsPerBatch = 100
val totalEventsPerChannel = batchCount * eventsPerBatch
val channelCapacity = 5000
val maxAttempts = 5

test("flume polling test") {
testMultipleTimes(testFlumePolling)
}

test("flume polling test multiple hosts") {
testMultipleTimes(testFlumePollingMultipleHost)
}

/**
* Run the given test until no more java.net.BindException's are thrown.
* Do this only up to a certain attempt limit.
*/
private def testMultipleTimes(test: () => Unit): Unit = {
var testPassed = false
var attempt = 0
while (!testPassed && attempt < maxAttempts) {
try {
test()
testPassed = true
} catch {
case e: Exception if Utils.isBindCollision(e) =>
logWarning("Exception when running flume polling test: " + e)
attempt += 1
}
}
assert(testPassed, s"Test failed after $attempt attempts!")
}

private def testFlumePolling(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
Expand Down Expand Up @@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
channel.stop()
}

test("flume polling test multiple hosts") {
private def testFlumePollingMultipleHost(): Unit = {
val testPort = getTestPort
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
Expand Down

0 comments on commit 2d31b6a

Please sign in to comment.