diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index b3f44471cd326..8e937e25a9f1c 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -44,7 +44,7 @@ org.apache.kafka kafka_${scala.binary.version} - 0.8.0 + 0.8.2.0 com.sun.jmx diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index b19c053ebfc44..ff7f5d5dd3e06 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random -import kafka.admin.CreateTopicCommand +import kafka.admin.AdminUtils import kafka.common.{KafkaException, TopicAndPartition} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import kafka.serializer.{StringDecoder, StringEncoder} @@ -77,8 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin var bindSuccess: Boolean = false while(!bindSuccess) { try { - val brokerProps = getBrokerConfig() - brokerConf = new KafkaConfig(brokerProps) + brokerConf = new KafkaConfig(brokerConfig) server = new KafkaServer(brokerConf) logInfo("==================== 2 ====================") server.startup() @@ -123,27 +122,26 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private def createTestMessage(topic: String, sent: Map[String, Int]) : Seq[KeyedMessage[String, String]] = { - val messages = for ((s, freq) <- sent; i <- 0 until freq) yield { + (for ((s, freq) <- sent; i <- 0 until freq) yield { new KeyedMessage[String, String](topic, s) - } - messages.toSeq + }).toSeq } def createTopic(topic: String) { - CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0") + AdminUtils.createTopic(zkClient, topic, 1, 1) logInfo("==================== 5 ====================") // wait until metadata is propagated - waitUntilMetadataIsPropagated(topic, 0) + waitUntilMetadataIsPropagated(Seq(server), topic, 0) } def produceAndSendMessage(topic: String, sent: Map[String, Int]) { - producer = new Producer[String, String](new ProducerConfig(getProducerConfig())) + producer = new Producer[String, String](new ProducerConfig(producerConfig)) producer.send(createTestMessage(topic, sent): _*) producer.close() logInfo("==================== 6 ====================") } - private def getBrokerConfig(): Properties = { + private def brokerConfig: Properties = { val props = new Properties() props.put("broker.id", "0") props.put("host.name", "localhost") @@ -155,7 +153,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin props } - private def getProducerConfig(): Properties = { + private def producerConfig: Properties = { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port val props = new Properties() props.put("metadata.broker.list", brokerAddr) @@ -163,13 +161,21 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin props } - private def waitUntilMetadataIsPropagated(topic: String, partition: Int) { + private def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int): Int = { + var leader: Int = -1 eventually(timeout(1000 milliseconds), interval(100 milliseconds)) { - assert( - server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)), - s"Partition [$topic, $partition] metadata not propagated after timeout" - ) + assert(servers.foldLeft(true) { + (result, server) => + val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition) + partitionStateOpt match { + case None => false + case Some(partitionState) => + leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader + result && leader >= 0 // is valid broker id + } + }, s"Partition [$topic, $partition] metadata not propagated after timeout") } + leader } class EmbeddedZookeeper(val zkConnect: String) {