From 96c7a1dd97a93d2abb4daad99b7ec24e5114356a Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 31 Oct 2014 13:31:07 +0800 Subject: [PATCH] Update the ReliableKafkaReceiver unit test --- .../kafka/ReliableKafkaStreamSuite.scala | 73 +++++-------------- 1 file changed, 18 insertions(+), 55 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 83a88597e57c7..c2f318efa61b8 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.kafka +import java.io.File + import scala.collection.mutable import kafka.serializer.StringDecoder @@ -25,6 +27,7 @@ import kafka.utils.{ZkUtils, ZKGroupTopicDirs} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext +import org.apache.spark.util.Utils class ReliableKafkaStreamSuite extends KafkaStreamSuite { import KafkaTestUtils._ @@ -35,6 +38,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { .setAppName(framework) .set("spark.streaming.receiver.writeAheadLog.enable", "true") val ssc = new StreamingContext(sparkConf, batchDuration) + val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + + s"test-checkpoint${random.nextInt(10000)}" + Utils.registerShutdownDeleteDir(new File(checkpointDir)) + ssc.checkpoint(checkpointDir) + val topic = "test" val sent = Map("a" -> 1, "b" -> 1, "c" -> 1) createTopic(topic) @@ -73,6 +81,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { .setAppName(framework) .set("spark.streaming.receiver.writeAheadLog.enable", "true") val ssc = new StreamingContext(sparkConf, batchDuration) + val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + + s"test-checkpoint${random.nextInt(10000)}" + Utils.registerShutdownDeleteDir(new File(checkpointDir)) + ssc.checkpoint(checkpointDir) + val topic = "test" val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) createTopic(topic) @@ -105,6 +118,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { .setAppName(framework) .set("spark.streaming.receiver.writeAheadLog.enable", "true") val ssc = new StreamingContext(sparkConf, batchDuration) + val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + + s"test-checkpoint${random.nextInt(10000)}" + Utils.registerShutdownDeleteDir(new File(checkpointDir)) + ssc.checkpoint(checkpointDir) + val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) topics.foreach { case (t, _) => @@ -133,61 +151,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } } - test("Verify offset commit when exception is met") { - val sparkConf = new SparkConf() - .setMaster(master) - .setAppName(framework) - .set("spark.streaming.receiver.writeAheadLog.enable", "true") - var ssc = new StreamingContext( - sparkConf.clone.set("spark.streaming.blockInterval", "10000"), - batchDuration) - val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) - val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) - topics.foreach { case (t, _) => - createTopic(t) - produceAndSendMessage(t, sent) - } - - val groupId = s"test-consumer-${random.nextInt(10000)}" - - val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort", - "group.id" -> groupId, - "auto.offset.reset" -> "smallest") - - KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY) - .foreachRDD(_ => throw new Exception) - try { - ssc.start() - ssc.awaitTermination(1000) - } catch { - case e: Exception => - if (ssc != null) { - ssc.stop() - ssc = null - } - } - // Failed before putting to BM, so offset is not updated. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } - - // Restart to see if data is consumed from last checkpoint. - ssc = new StreamingContext(sparkConf, batchDuration) - KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY) - .foreachRDD(_ => Unit) - ssc.start() - ssc.awaitTermination(3000) - ssc.stop() - - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } - } - private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = { assert(zkClient != null, "Zookeeper client is not initialized")