From 326ff3cbda37066ebef7492241276754164d2879 Mon Sep 17 00:00:00 2001 From: cody koeninger Date: Tue, 16 Dec 2014 15:27:44 -0600 Subject: [PATCH] add some tests --- .../apache/spark/rdd/kafka/KafkaCluster.scala | 4 +- .../org/apache/spark/rdd/kafka/KafkaRDD.scala | 6 +- .../spark/rdd/kafka/KafkaRDDSuite.scala | 79 +++++++++++++++++++ .../streaming/kafka/KafkaStreamSuite.scala | 2 +- 4 files changed, 86 insertions(+), 5 deletions(-) create mode 100644 external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala index 4c48639d939ff..afbe18ff1c3e0 100644 --- a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala +++ b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala @@ -32,7 +32,7 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer} * NOT zookeeper servers, specified in host1:port1,host2:port2 form */ class KafkaCluster(val kafkaParams: Map[String, String]) { - type Err = ArrayBuffer[Throwable] + import KafkaCluster.Err val seedBrokers: Array[(String, Int)] = kafkaParams.get("metadata.broker.list") @@ -287,6 +287,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) { } object KafkaCluster { + type Err = ArrayBuffer[Throwable] + /** Make a consumer config without requiring group.id or zookeeper.connect, * since communicating with brokers also needs common settings such as timeout */ diff --git a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala index c2a56b556fdd2..5837dd405aee5 100644 --- a/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala +++ b/external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala @@ -79,7 +79,7 @@ class KafkaRDD[ override def compute(thePart: Partition, context: TaskContext) = { val part = thePart.asInstanceOf[KafkaRDDPartition] if (part.fromOffset >= part.untilOffset) { - log.warn("Beginning offset is same or after ending offset" + + log.warn("Beginning offset is same or after ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { @@ -87,7 +87,7 @@ class KafkaRDD[ context.addTaskCompletionListener{ context => closeIfNeeded() } val kc = new KafkaCluster(kafkaParams) - log.info(s"Computing topic ${part.topic}, partition ${part.partition}" + + log.info(s"Computing topic ${part.topic}, partition ${part.partition} " + s"offsets ${part.fromOffset} -> ${part.untilOffset}") val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(kc.config.props) @@ -97,7 +97,7 @@ class KafkaRDD[ .asInstanceOf[Decoder[V]] val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold( errs => throw new Exception( - s"Couldn't connect to leader for topic ${part.topic} ${part.partition}:" + + s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " + errs.mkString("\n")), consumer => consumer ) diff --git a/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala new file mode 100644 index 0000000000000..284c9d9dc996d --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd.kafka + +import scala.util.Random + +import kafka.serializer.StringDecoder +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.kafka.KafkaStreamSuiteBase + +class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter { + var sc: SparkContext = _ + before { + setupKafka() + } + + after { + if (sc != null) { + sc.stop + sc = null + } + tearDownKafka() + } + + test("Kafka RDD") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + sc = new SparkContext(sparkConf) + val topic = "topic1" + val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) + createTopic(topic) + produceAndSendMessage(topic, sent) + + val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort", + "group.id" -> s"test-consumer-${Random.nextInt(10000)}") + + val kc = new KafkaCluster(kafkaParams) + + val rdd = getRdd(kc, Set(topic)) + assert(rdd.isDefined) + assert(rdd.get.countByValue.size === sent.size) + + kc.setConsumerOffsets(kafkaParams("group.id"), rdd.get.untilOffsets) + + val rdd2 = getRdd(kc, Set(topic)) + assert(rdd2.isDefined) + assert(rdd2.get.count === 0) + } + + private def getRdd(kc: KafkaCluster, topics: Set[String]) = { + val groupId = kc.kafkaParams("group.id") + for { + topicPartitions <- kc.getPartitions(topics).right.toOption + from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse( + kc.getEarliestLeaderOffsets(topicPartitions).right.toOption) + until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption + } yield { + new KafkaRDD[String, String, StringDecoder, StringDecoder, String]( + sc, kc.kafkaParams, from, until, mmd => mmd.message) + } + } +} diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 52454f4206d15..629a758f6f1a3 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -56,7 +56,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Loggin private val zkSessionTimeout = 6000 private var zookeeper: EmbeddedZookeeper = _ private var zkPort: Int = 0 - private var brokerPort = 9092 + protected var brokerPort = 9092 private var brokerConf: KafkaConfig = _ private var server: KafkaServer = _ private var producer: Producer[String, String] = _