diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 104f772fd76bb..b11a73317163a 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -231,11 +231,11 @@ private class KafkaTestUtils extends Logging { /** wait until the leader offset for the given topic / partition equals the specified offset */ def waitUntilLeaderOffset( - kc: KafkaCluster, topic: String, partition: Int, offset: Long): Unit = { eventually(Time(10000), Time(100)) { + val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress)) val tp = TopicAndPartition(topic, partition) val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset assert( diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index d0b137a11b1d0..5cf379635354f 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -72,9 +72,8 @@ public void testKafkaRDD() throws InterruptedException { HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); - KafkaCluster kc = new KafkaCluster(kafkaParams); - kafkaTestUtils.waitUntilLeaderOffset(kc, topic1, 0, topic1data.length); - kafkaTestUtils.waitUntilLeaderOffset(kc, topic2, 0, topic2data.length); + kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length); + kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length); OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala index 996da660b927a..fbaed25ac2176 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala @@ -61,8 +61,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress, "group.id" -> s"test-consumer-${Random.nextInt}") - val kc = new KafkaCluster(kafkaParams) - kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, messages.size) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size) val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size)) @@ -87,7 +86,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { // this is the "lots of messages" case kafkaTestUtils.sendMessages(topic, sent) val sentCount = sent.values.sum - kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount) // rdd defined from leaders after sending messages, should get the number sent val rdd = getRdd(kc, Set(topic)) @@ -113,7 +112,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll { val sentOnlyOne = Map("d" -> 1) kafkaTestUtils.sendMessages(topic, sentOnlyOne) - kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount + 1) + kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1) assert(rdd2.isDefined) assert(rdd2.get.count === 0, "got messages when there shouldn't be any") diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index ee1ede11cf243..5f18e1dbb922c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -615,7 +615,7 @@ def test_kafka_stream(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - time.sleep(5) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), "test-streaming-consumer", {topic: 1}, @@ -659,7 +659,7 @@ def test_kafka_rdd(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - time.sleep(5) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) self._validateRddResult(sendData, rdd) @@ -675,7 +675,7 @@ def test_kafka_rdd_with_leaders(self): self._kafkaTestUtils.createTopic(topic) self._kafkaTestUtils.sendMessages(topic, sendData) - time.sleep(5) + self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values())) rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) self._validateRddResult(sendData, rdd)