From bb4729a11e8625ae5be7404200e25123cea65ad6 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Nov 2014 07:22:08 -0800 Subject: [PATCH] Same treatment for saveAsHadoopFiles --- .../dstream/PairDStreamFunctions.scala | 8 +++-- .../spark/streaming/CheckpointSuite.scala | 36 +++++++++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 562fe35fb43f3..98539e06b4e29 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -668,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf + conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration) ) { + // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints + val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) } self.foreachRDD(saveFunc) } @@ -701,7 +703,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = ssc.sparkContext.hadoopConfiguration ) { - // Wrap this in SerializableWritable so that ForeachDStream can be serialized for checkpoints + // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index acd793dd76281..c97998add8ffa 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -22,15 +22,18 @@ import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag + import com.google.common.io.Files -import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} + import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import org.apache.hadoop.io.{Text, IntWritable} -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat /** * This test suites tests the checkpointing functionality of DStreams - @@ -207,20 +210,19 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } - test("recovery with saveAsNewAPIHadoopFiles") { + test("recovery with saveAsHadoopFiles operation") { val tempDir = Files.createTempDir() try { testCheckpointedOperation( Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), (s: DStream[String]) => { val output = s.map(x => (x, 1)).reduceByKey(_ + _) - output.saveAsNewAPIHadoopFiles( + output.saveAsHadoopFiles( tempDir.toURI.toString, "result", classOf[Text], classOf[IntWritable], classOf[TextOutputFormat[Text, IntWritable]]) - (tempDir.toString, "result") output }, Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), @@ -231,6 +233,28 @@ class CheckpointSuite extends TestSuiteBase { } } + test("recovery with saveAsNewAPIHadoopFiles operation") { + val tempDir = Files.createTempDir() + try { + testCheckpointedOperation( + Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), + (s: DStream[String]) => { + val output = s.map(x => (x, 1)).reduceByKey(_ + _) + output.saveAsNewAPIHadoopFiles( + tempDir.toURI.toString, + "result", + classOf[Text], + classOf[IntWritable], + classOf[NewTextOutputFormat[Text, IntWritable]]) + output + }, + Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), + 3 + ) + } finally { + Utils.deleteRecursively(tempDir) + } + } // This tests whether the StateDStream's RDD checkpoints works correctly such // that the system can recover from a master failure. This assumes as reliable,