From 8838ad7c135a585cde015dc38b5cb23314502dd9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Nov 2014 14:16:27 -0800 Subject: [PATCH] [SPARK-4196][SPARK-4602][Streaming] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles Solves two JIRAs in one shot - Makes the ForechDStream created by saveAsNewAPIHadoopFiles serializable for checkpoints - Makes the default configuration object used saveAsNewAPIHadoopFiles be the Spark's hadoop configuration Author: Tathagata Das Closes #3457 from tdas/savefiles-fix and squashes the following commits: bb4729a [Tathagata Das] Same treatment for saveAsHadoopFiles b382ea9 [Tathagata Das] Fix serialization issue in PairDStreamFunctions.saveAsNewAPIHadoopFiles. --- .../dstream/PairDStreamFunctions.scala | 30 +++++----- .../spark/streaming/CheckpointSuite.scala | 56 ++++++++++++++++++- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 3f03f42270252..98539e06b4e29 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -17,20 +17,17 @@ package org.apache.spark.streaming.dstream -import org.apache.spark.streaming.StreamingContext._ - -import org.apache.spark.{Partitioner, HashPartitioner} -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.conf.Configuration -import org.apache.spark.streaming.{Time, Duration} +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} + +import org.apache.spark.{HashPartitioner, Partitioner, SerializableWritable} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Duration, Time} +import org.apache.spark.streaming.StreamingContext._ /** * Extra functions available on DStream of (key, value) pairs through an implicit conversion. @@ -671,11 +668,13 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf + conf: JobConf = new JobConf(ssc.sparkContext.hadoopConfiguration) ) { + // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints + val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, serializableConf.value) } self.foreachRDD(saveFunc) } @@ -702,11 +701,14 @@ class PairDStreamFunctions[K, V](self: DStream[(K,V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration + conf: Configuration = ssc.sparkContext.hadoopConfiguration ) { + // Wrap conf in SerializableWritable so that ForeachDStream can be serialized for checkpoints + val serializableConf = new SerializableWritable(conf) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) - rdd.saveAsNewAPIHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) + rdd.saveAsNewAPIHadoopFile( + file, keyClass, valueClass, outputFormatClass, serializableConf.value) } self.foreachRDD(saveFunc) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 77ff1ca780a58..c97998add8ffa 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -22,9 +22,14 @@ import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag + import com.google.common.io.Files -import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.io.{IntWritable, Text} +import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} + import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock @@ -205,6 +210,51 @@ class CheckpointSuite extends TestSuiteBase { testCheckpointedOperation(input, operation, output, 7) } + test("recovery with saveAsHadoopFiles operation") { + val tempDir = Files.createTempDir() + try { + testCheckpointedOperation( + Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), + (s: DStream[String]) => { + val output = s.map(x => (x, 1)).reduceByKey(_ + _) + output.saveAsHadoopFiles( + tempDir.toURI.toString, + "result", + classOf[Text], + classOf[IntWritable], + classOf[TextOutputFormat[Text, IntWritable]]) + output + }, + Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), + 3 + ) + } finally { + Utils.deleteRecursively(tempDir) + } + } + + test("recovery with saveAsNewAPIHadoopFiles operation") { + val tempDir = Files.createTempDir() + try { + testCheckpointedOperation( + Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()), + (s: DStream[String]) => { + val output = s.map(x => (x, 1)).reduceByKey(_ + _) + output.saveAsNewAPIHadoopFiles( + tempDir.toURI.toString, + "result", + classOf[Text], + classOf[IntWritable], + classOf[NewTextOutputFormat[Text, IntWritable]]) + output + }, + Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), + 3 + ) + } finally { + Utils.deleteRecursively(tempDir) + } + } // This tests whether the StateDStream's RDD checkpoints works correctly such // that the system can recover from a master failure. This assumes as reliable, @@ -391,7 +441,9 @@ class CheckpointSuite extends TestSuiteBase { logInfo("Manual clock after advancing = " + clock.time) Thread.sleep(batchDuration.milliseconds) - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] + val outputStream = ssc.graph.getOutputStreams.filter { dstream => + dstream.isInstanceOf[TestOutputStreamWithPartitions[V]] + }.head.asInstanceOf[TestOutputStreamWithPartitions[V]] outputStream.output.map(_.flatten) } }