Skip to content

Commit

Permalink
[SPARK-2808][Streaming][Kafka] make waitUntilLeaderOffset easier to c…
Browse files Browse the repository at this point in the history
…all, call it from python tests as well
  • Loading branch information
koeninger committed May 1, 2015
1 parent d4267e9 commit 1770abc
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ private class KafkaTestUtils extends Logging {

/** wait until the leader offset for the given topic / partition equals the specified offset */
def waitUntilLeaderOffset(
kc: KafkaCluster,
topic: String,
partition: Int,
offset: Long): Unit = {
eventually(Time(10000), Time(100)) {
val kc = new KafkaCluster(Map("metadata.broker.list" -> brokerAddress))
val tp = TopicAndPartition(topic, partition)
val llo = kc.getLatestLeaderOffsets(Set(tp)).right.get.apply(tp).offset
assert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ public void testKafkaRDD() throws InterruptedException {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());

KafkaCluster kc = new KafkaCluster(kafkaParams);
kafkaTestUtils.waitUntilLeaderOffset(kc, topic1, 0, topic1data.length);
kafkaTestUtils.waitUntilLeaderOffset(kc, topic2, 0, topic2data.length);
kafkaTestUtils.waitUntilLeaderOffset(topic1, 0, topic1data.length);
kafkaTestUtils.waitUntilLeaderOffset(topic2, 0, topic2data.length);

OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"group.id" -> s"test-consumer-${Random.nextInt}")

val kc = new KafkaCluster(kafkaParams)
kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, messages.size)
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, messages.size)

val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))

Expand All @@ -87,7 +86,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
val sentCount = sent.values.sum
kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount)
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)

// rdd defined from leaders after sending messages, should get the number sent
val rdd = getRdd(kc, Set(topic))
Expand All @@ -113,7 +112,7 @@ class KafkaRDDSuite extends FunSuite with BeforeAndAfterAll {
val sentOnlyOne = Map("d" -> 1)

kafkaTestUtils.sendMessages(topic, sentOnlyOne)
kafkaTestUtils.waitUntilLeaderOffset(kc, topic, 0, sentCount + 1)
kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount + 1)

assert(rdd2.isDefined)
assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def test_kafka_stream(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
time.sleep(5)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))

stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
Expand Down Expand Up @@ -659,7 +659,7 @@ def test_kafka_rdd(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
time.sleep(5)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
self._validateRddResult(sendData, rdd)

Expand All @@ -675,7 +675,7 @@ def test_kafka_rdd_with_leaders(self):

self._kafkaTestUtils.createTopic(topic)
self._kafkaTestUtils.sendMessages(topic, sendData)
time.sleep(5)
self._kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sum(sendData.values()))
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)

Expand Down

0 comments on commit 1770abc

Please sign in to comment.