Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#SPARK-2808 update kafka to version 0.8.2 #3631

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.0</version>
<version>0.8.2.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import kafka.admin.CreateTopicCommand
import kafka.admin.AdminUtils
import kafka.common.{KafkaException, TopicAndPartition}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.{StringDecoder, StringEncoder}
Expand Down Expand Up @@ -77,8 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
brokerConf = new KafkaConfig(brokerConfig)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
server.startup()
Expand Down Expand Up @@ -123,27 +122,26 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin

private def createTestMessage(topic: String, sent: Map[String, Int])
: Seq[KeyedMessage[String, String]] = {
val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
(for ((s, freq) <- sent; i <- 0 until freq) yield {
new KeyedMessage[String, String](topic, s)
}
messages.toSeq
}).toSeq
}

def createTopic(topic: String) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
AdminUtils.createTopic(zkClient, topic, 1, 1)
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(topic, 0)
waitUntilMetadataIsPropagated(Seq(server), topic, 0)
}

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer = new Producer[String, String](new ProducerConfig(producerConfig))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}

private def getBrokerConfig(): Properties = {
private def brokerConfig: Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
Expand All @@ -155,21 +153,29 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin
props
}

private def getProducerConfig(): Properties = {
private def producerConfig: Properties = {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
private def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int): Int = {
var leader: Int = -1
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
assert(
server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
assert(servers.foldLeft(true) {
(result, server) =>
val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
partitionStateOpt match {
case None => false
case Some(partitionState) =>
leader = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr.leader
result && leader >= 0 // is valid broker id
}
}, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
leader
}

class EmbeddedZookeeper(val zkConnect: String) {
Expand Down