diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 9077d1fa0d549..c7b3c4d33f796 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -70,7 +70,8 @@ object KafkaUtils { topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { - new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel) + val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false) + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, WALEnabled, storageLevel) } /** @@ -143,121 +144,4 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } - - /** - * Create an reliable input stream that pulls messages from a Kafka Broker. - * @param ssc StreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) - * @param groupId The group id for this consumer - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread - * @param storageLevel Storage level to use for storing the received objects - * (default: StorageLevel.MEMORY_AND_DISK_SER_2) - */ - def createReliableStream( - ssc: StreamingContext, - zkQuorum: String, - groupId: String, - topics: Map[String, Int], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) - : ReceiverInputDStream[(String, String)] = { - val kafkaParams = Map[String, String]( - "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, - "zookeeper.connection.timeout.ms" -> "10000") - createReliableStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topics, storageLevel) - } - - /** - * Create an reliable input stream that pulls messages from a Kafka Broker. - * @param ssc StreamingContext object - * @param kafkaParams Map of kafka configuration parameters, - * see http://kafka.apache.org/08/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel Storage level to use for storing the received objects - */ - def createReliableStream[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag]( - ssc: StreamingContext, - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ): ReceiverInputDStream[(K, V)] = { - new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel) - } - - /** - * Create an reliable Java input stream that pulls messages form a Kafka Broker. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param jssc JavaStreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) - * @param groupId The group id for this consumer - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread - */ - def createReliableStream( - jssc: JavaStreamingContext, - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt] - ): JavaPairReceiverInputDStream[String, String] = { - createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) - } - - /** - * Create an reliable Java input stream that pulls messages form a Kafka Broker. - * @param jssc JavaStreamingContext object - * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param storageLevel RDD storage level. - */ - def createReliableStream( - jssc: JavaStreamingContext, - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[String, String] = { - createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - storageLevel) - } - - /** - * Create an reliable Java input stream that pulls messages form a Kafka Broker. - * @param jssc JavaStreamingContext object - * @param keyTypeClass Key type of RDD - * @param valueTypeClass value type of RDD - * @param keyDecoderClass Type of kafka key decoder - * @param valueDecoderClass Type of kafka value decoder - * @param kafkaParams Map of kafka configuration parameters, - * see http://kafka.apache.org/08/configuration.html - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread - * @param storageLevel RDD storage level. - */ - def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]]( - jssc: JavaStreamingContext, - keyTypeClass: Class[K], - valueTypeClass: Class[V], - keyDecoderClass: Class[U], - valueDecoderClass: Class[T], - kafkaParams: JMap[String, String], - topics: JMap[String, JInt], - storageLevel: StorageLevel - ): JavaPairReceiverInputDStream[K, V] = { - implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass) - implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass) - - implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass) - implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass) - - createReliableStream[K, V, U, T]( - jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) - } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 57a66f4e64fd5..50bcd44ea5605 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -82,7 +82,6 @@ class ReliableKafkaReceiver[ } override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { - // TODO. this should be replaced to reliable store after WAL is ready. store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]]) // Commit and remove the related offsets. @@ -120,7 +119,7 @@ class ReliableKafkaReceiver[ if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + - "otherwise we cannot enable reliable offset commit mechanism") + "otherwise we will manually set it to false to turn off auto offset commit in Kafka") } val props = new Properties() diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 0cf2752ebdb4d..0aa49987ea23a 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -30,7 +30,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { import KafkaTestUtils._ test("Reliable Kafka input stream") { - val ssc = new StreamingContext(master, framework, batchDuration) + val sparkConf = new SparkConf() + .setMaster(master) + .setAppName(framework) + .set("spark.streaming.receiver.writeAheadLog.enable", "true") + val ssc = new StreamingContext(sparkConf, batchDuration) val topic = "test" val sent = Map("a" -> 1, "b" -> 1, "c" -> 1) createTopic(topic) @@ -40,7 +44,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { "group.id" -> s"test-consumer-${random.nextInt(10000)}", "auto.offset.reset" -> "smallest") - val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), @@ -64,7 +68,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { } test("Verify the offset commit") { - val ssc = new StreamingContext(master, framework, batchDuration) + val sparkConf = new SparkConf() + .setMaster(master) + .setAppName(framework) + .set("spark.streaming.receiver.writeAheadLog.enable", "true") + val ssc = new StreamingContext(sparkConf, batchDuration) val topic = "test" val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) createTopic(topic) @@ -78,7 +86,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { assert(getCommitOffset(groupId, topic, 0) === 0L) - val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), @@ -92,7 +100,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { } test("Verify multiple topics offset commit") { - val ssc = new StreamingContext(master, framework, batchDuration) + val sparkConf = new SparkConf() + .setMaster(master) + .setAppName(framework) + .set("spark.streaming.receiver.writeAheadLog.enable", "true") + val ssc = new StreamingContext(sparkConf, batchDuration) val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) val sent = Map("a" -> 10, "b" -> 10, "c" -> 10) topics.foreach { case (t, _) => @@ -108,7 +120,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } - val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, @@ -125,6 +137,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { val sparkConf = new SparkConf() .setMaster(master) .setAppName(framework) + .set("spark.streaming.receiver.writeAheadLog.enable", "true") var ssc = new StreamingContext( sparkConf.clone.set("spark.streaming.blockInterval", "4000"), batchDuration) @@ -141,7 +154,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { "group.id" -> groupId, "auto.offset.reset" -> "smallest") - KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, @@ -161,7 +174,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite { // Restart to see if data is consumed from last checkpoint. ssc = new StreamingContext(sparkConf, batchDuration) - KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder]( + KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics,