Skip to content

Commit

Permalink
[SPARK-2075][Core] Make the compiler generate same bytes code for Had…
Browse files Browse the repository at this point in the history
…oop 1.+ and Hadoop 2.+

`NullWritable` is a `Comparable` rather than `Comparable[NullWritable]` in Hadoop 1.+, so the compiler cannot find an implicit Ordering for it. It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler will generate same codes.

I used the following commands to confirm the generated byte codes are some.
```
mvn -Dhadoop.version=1.2.1 -DskipTests clean package -pl core -am
javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop1.txt

mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package -pl core -am
javap -private -c -classpath core/target/scala-2.10/classes org.apache.spark.rdd.RDD > ~/hadoop2.txt

diff ~/hadoop1.txt ~/hadoop2.txt
```

However, the compiler will generate different codes for the classes which call methods of `JobContext/TaskAttemptContext`. `JobContext/TaskAttemptContext` is a class in Hadoop 1.+, and calling its method will use `invokevirtual`, while it's an interface in Hadoop 2.+, and will use `invokeinterface`.

To fix it, we can use reflection to call `JobContext/TaskAttemptContext.getConfiguration`.

Author: zsxwing <zsxwing@gmail.com>

Closes #3740 from zsxwing/SPARK-2075 and squashes the following commits:

39d9df2 [zsxwing] Fix the code style
e4ad8b5 [zsxwing] Use null for the implicit Ordering
734bac9 [zsxwing] Explicitly set the implicit parameters
ca03559 [zsxwing] Use reflection to access JobContext/TaskAttemptContext.getConfiguration
fa40db0 [zsxwing] Add an Ordering for NullWritable to make the compiler generate same byte codes for RDD
  • Loading branch information
zsxwing authored and rxin committed Dec 22, 2014
1 parent c6a3c0d commit 6ee6aa7
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 6 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

Expand Down Expand Up @@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}

/**
* Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
* call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
* for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.spark.deploy.SparkHadoopUtil

/**
* Custom Input Format for reading and splitting flat binary files that contain records,
Expand All @@ -33,7 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat {

/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.spark.deploy.SparkHadoopUtil

/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
Expand Down Expand Up @@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
val job = context.getConfiguration
val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
if (codec != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}

import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil

/**
* A general format for reading whole files in as streams, byte arrays,
Expand Down Expand Up @@ -145,7 +146,8 @@ class PortableDataStream(

private val confBytes = {
val baos = new ByteArrayOutputStream()
context.getConfiguration.write(new DataOutputStream(baos))
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
write(new DataOutputStream(baos))
baos.toByteArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.deploy.SparkHadoopUtil


/**
Expand All @@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader(
extends RecordReader[String, String] with Configurable {

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
private[this] val fs = path.getFileSystem(
SparkHadoopUtil.get.getConfigurationFromJobContext(context))

// True means the current file has been processed, then skip it.
private[this] var processed = false
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1174,15 +1174,32 @@ abstract class RDD[T: ClassTag](
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
// https://issues.apache.org/jira/browse/SPARK-2075
//
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
// Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
// Ordering for `NullWritable`. That's why the compiler will generate different anonymous
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
//
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}

/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
// https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}

Expand Down

0 comments on commit 6ee6aa7

Please sign in to comment.