From 4854ee95b959dfac3242ce67c050cb4c2bfa1bba Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 10 Nov 2014 14:42:22 +0800 Subject: [PATCH] Address all the comments --- .../kafka/ReliableKafkaReceiver.scala | 101 +++++++++++------- .../kafka/ReliableKafkaStreamSuite.scala | 11 ++ 2 files changed, 75 insertions(+), 37 deletions(-) diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index d832a82aacf09..417b0464c1e86 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -18,14 +18,14 @@ package org.apache.spark.streaming.kafka import java.util.Properties -import java.util.concurrent.{ConcurrentHashMap, Executors} +import java.util.concurrent.ConcurrentHashMap import scala.collection.Map import scala.collection.mutable import scala.reflect.{classTag, ClassTag} import kafka.common.TopicAndPartition -import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector} +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream} import kafka.serializer.Decoder import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties} import org.I0Itec.zkclient.ZkClient @@ -33,6 +33,7 @@ import org.I0Itec.zkclient.ZkClient import org.apache.spark.{SparkEnv, Logging} import org.apache.spark.storage.{StreamBlockId, StorageLevel} import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver} +import org.apache.spark.util.Utils private[streaming] class ReliableKafkaReceiver[ @@ -45,27 +46,33 @@ class ReliableKafkaReceiver[ storageLevel: StorageLevel) extends Receiver[Any](storageLevel) with Logging { - /** High level consumer to connect to Kafka */ + /** High level consumer to connect to Kafka. */ private var consumerConnector: ConsumerConnector = null - /** zkClient to connect to Zookeeper to commit the offsets */ + /** zkClient to connect to Zookeeper to commit the offsets. */ private var zkClient: ZkClient = null private val groupId = kafkaParams("group.id") - private lazy val env = SparkEnv.get + private def conf() = SparkEnv.get.conf private val AUTO_OFFSET_COMMIT = "auto.commit.enable" /** A HashMap to manage the offset for each topic/partition, this HashMap is called in - * synchronized block, so mutable HashMap will not meet concurrency issue */ - private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] + * synchronized block, so mutable HashMap will not meet concurrency issue. + */ + private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null - /** A concurrent HashMap to store the stream block id and related offset snapshot */ - private lazy val blockOffsetMap = - new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] + /** A concurrent HashMap to store the stream block id and related offset snapshot. */ + private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null - private lazy val blockGeneratorListener = new BlockGeneratorListener { + /** Manage the BlockGenerator in receiver itself for better managing block store and offset + * commit. + */ + private var blockGenerator: BlockGenerator = null + + /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */ + private final class OffsetCheckpointListener extends BlockGeneratorListener { override def onStoreData(data: Any, metadata: Any): Unit = { if (metadata != null) { val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)] @@ -96,10 +103,6 @@ class ReliableKafkaReceiver[ } } - /** Manage the BlockGenerator in receiver itself for better managing block store and offset - * commit */ - private var blockGenerator: BlockGenerator = null - override def onStop(): Unit = { if (consumerConnector != null) { consumerConnector.shutdown() @@ -111,13 +114,33 @@ class ReliableKafkaReceiver[ zkClient = null } - blockGenerator.stop() + if (blockGenerator != null) { + blockGenerator.stop() + blockGenerator = null + } + + if (topicPartitionOffsetMap != null) { + topicPartitionOffsetMap.clear() + topicPartitionOffsetMap = null + } + + if (blockOffsetMap != null) { + blockOffsetMap.clear() + blockOffsetMap = null + } } override def onStart(): Unit = { logInfo(s"Starting Kafka Consumer Stream with group: $groupId") - blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf) + // Initialize the topic-partition / offset hash map. + topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] + + // Initialize the stream block id / offset snapshot hash map. + blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() + + // Initialize the block generator for storing Kafka message. + blockGenerator = new BlockGenerator(new OffsetCheckpointListener, streamId, conf()) if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + @@ -133,7 +156,7 @@ class ReliableKafkaReceiver[ val consumerConfig = new ConsumerConfig(props) - assert(consumerConfig.autoCommitEnable == false) + assert(!consumerConfig.autoCommitEnable) logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") consumerConnector = Consumer.create(consumerConfig) @@ -156,27 +179,12 @@ class ReliableKafkaReceiver[ val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) - val executorPool = Executors.newFixedThreadPool(topics.values.sum) + val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { topicMessageStreams.values.foreach { streams => streams.foreach { stream => - executorPool.submit(new Runnable { - override def run(): Unit = { - logInfo(s"Starting message process thread ${Thread.currentThread().getId}.") - try { - for (msgAndMetadata <- stream) { - val topicAndPartition = TopicAndPartition( - msgAndMetadata.topic, msgAndMetadata.partition) - val metadata = (topicAndPartition, msgAndMetadata.offset) - - blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata) - } - } catch { - case e: Throwable => logError("Error handling message; existing", e) - } - } - }) + executorPool.submit(new MessageHandler(stream)) } } } finally { @@ -184,13 +192,32 @@ class ReliableKafkaReceiver[ } } + /** A inner class to handle received Kafka message. */ + private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { + override def run(): Unit = { + logInfo(s"Starting message process thread ${Thread.currentThread().getId}.") + try { + for (msgAndMetadata <- stream) { + val topicAndPartition = TopicAndPartition( + msgAndMetadata.topic, msgAndMetadata.partition) + val metadata = (topicAndPartition, msgAndMetadata.offset) + + blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata) + } + } catch { + case e: Throwable => logError("Error handling message; existing", e) + } + } + } + /** * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's * metadata schema in Zookeeper. */ private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { if (zkClient == null) { - logError(s"zkClient $zkClient should be initialized at started") + val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") + stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) return } @@ -205,7 +232,7 @@ class ReliableKafkaReceiver[ s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t) } - logInfo(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " + + logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + s"partition ${topicAndPart.partition}") } } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index c2f318efa61b8..9a8557e496295 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -69,13 +69,17 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { ssc.start() ssc.awaitTermination(3000) + // A basic process verification for ReliableKafkaReceiver. + // Verify whether received message number is equal to the sent message number. assert(sent.size === result.size) + // Verify whether each message is the same as the data to be verified. sent.keys.foreach { k => assert(sent(k) === result(k).toInt) } ssc.stop() } test("Verify the offset commit") { + // Verify the corretness of offset commit mechanism. val sparkConf = new SparkConf() .setMaster(master) .setAppName(framework) @@ -97,8 +101,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { "group.id" -> groupId, "auto.offset.reset" -> "smallest") + // Verify whether the offset of this group/topic/partition is 0 before starting. assert(getCommitOffset(groupId, topic, 0) === 0L) + // Do this to consume all the message of this group/topic. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, @@ -109,6 +115,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { ssc.awaitTermination(3000) ssc.stop() + // Verify the offset number whether it is equal to the total message number. assert(getCommitOffset(groupId, topic, 0) === 29L) } @@ -136,8 +143,10 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { "group.id" -> groupId, "auto.offset.reset" -> "smallest") + // Before started, verify all the group/topic/partition offsets are 0. topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } + // Consuming all the data sent to the broker which will potential commit the offsets internally. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, @@ -148,9 +157,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { ssc.awaitTermination(3000) ssc.stop() + // Verify the offset for each group/topic to see whether they are equal to the expected one. topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } } + /** Getting partition offset from Zookeeper. */ private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = { assert(zkClient != null, "Zookeeper client is not initialized")