Skip to content

Commit

Permalink
[SPARK-2808][Streaming][Kafka] wait for leader offsets in the java te…
Browse files Browse the repository at this point in the history
…st as well
  • Loading branch information
koeninger committed Apr 29, 2015
1 parent 3824ce3 commit 2b92d3f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka
import scala.util.control.NonFatal
import scala.util.Random
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import java.util.Properties
import kafka.api._
import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
Expand All @@ -37,6 +38,11 @@ private[spark]
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}

/** Constructor that takes a Java map */
def this(kafkaParams: java.util.Map[String, String]) {
this(kafkaParams.asScala.toMap)
}

// ConsumerConfig isn't serializable
@transient private var _config: SimpleConsumerConfig = null

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public void testKafkaRDD() throws InterruptedException {
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());

KafkaCluster kc = new KafkaCluster(kafkaParams);
kafkaTestUtils.waitUntilLeaderOffset(kc, topic1, 0, topic1data.length);
kafkaTestUtils.waitUntilLeaderOffset(kc, topic2, 0, topic2data.length);

OffsetRange[] offsetRanges = {
OffsetRange.create(topic1, 0, 0, 1),
OffsetRange.create(topic2, 0, 0, 1)
Expand Down

0 comments on commit 2b92d3f

Please sign in to comment.