From 2b92d3f919a045d20965ddc6b02465a7e5b2c64d Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Wed, 29 Apr 2015 11:47:04 -0500 Subject: [PATCH] [SPARK-2808][Streaming][Kafka] wait for leader offsets in the java test as well --- .../org/apache/spark/streaming/kafka/KafkaCluster.scala | 6 ++++++ .../org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala index 194fcafd6bd93..5425d338311c2 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kafka import scala.util.control.NonFatal import scala.util.Random import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ import java.util.Properties import kafka.api._ import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} @@ -37,6 +38,11 @@ private[spark] class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable { import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig} + /** Constructor that takes a Java map */ + def this(kafkaParams: java.util.Map[String, String]) { + this(kafkaParams.asScala.toMap) + } + // ConsumerConfig isn't serializable @transient private var _config: SimpleConsumerConfig = null diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java index a9dc6e50613ca..d0b137a11b1d0 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java @@ -72,6 +72,10 @@ public void testKafkaRDD() throws InterruptedException { HashMap kafkaParams = new HashMap(); kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress()); + KafkaCluster kc = new KafkaCluster(kafkaParams); + kafkaTestUtils.waitUntilLeaderOffset(kc, topic1, 0, topic1data.length); + kafkaTestUtils.waitUntilLeaderOffset(kc, topic2, 0, topic2data.length); + OffsetRange[] offsetRanges = { OffsetRange.create(topic1, 0, 0, 1), OffsetRange.create(topic2, 0, 0, 1)