Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2075][Core] Make the compiler generate same bytes code for Hadoop 1.+ and Hadoop 2.+ #3740

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

Expand Down Expand Up @@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}

/**
* Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
* call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
* for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.spark.deploy.SparkHadoopUtil

/**
* Custom Input Format for reading and splitting flat binary files that contain records,
Expand All @@ -33,7 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat {

/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.spark.deploy.SparkHadoopUtil

/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
Expand Down Expand Up @@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
val job = context.getConfiguration
val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
if (codec != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt
import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit}

import org.apache.spark.annotation.Experimental
import org.apache.spark.deploy.SparkHadoopUtil

/**
* A general format for reading whole files in as streams, byte arrays,
Expand Down Expand Up @@ -145,7 +146,8 @@ class PortableDataStream(

private val confBytes = {
val baos = new ByteArrayOutputStream()
context.getConfiguration.write(new DataOutputStream(baos))
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
write(new DataOutputStream(baos))
baos.toByteArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.deploy.SparkHadoopUtil


/**
Expand All @@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader(
extends RecordReader[String, String] with Configurable {

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
private[this] val fs = path.getFileSystem(
SparkHadoopUtil.get.getConfigurationFromJobContext(context))

// True means the current file has been processed, then skip it.
private[this] var processed = false
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1174,15 +1174,32 @@ abstract class RDD[T: ClassTag](
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
// https://issues.apache.org/jira/browse/SPARK-2075
//
// NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
// Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
// in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
// Ordering for `NullWritable`. That's why the compiler will generate different anonymous
// classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
//
// Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
// same bytecodes for `saveAsTextFile`.
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here can be reused too

RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the problem here that while compiling Hadoop 2, the compiler chooses to specify the Ordering on the implicit rddToPairRDDFunctions, while in Hadoop 1 it instead uses the default method (return null) to invoke the implicit?

I wonder if a more explicit solution, like the introduction of a conversion to PairRDDFunctions which takes an Ordering, is warranted for these cases. e.g.:

this.map(x => (NullWritable.get(), new Text(x.toString)))
  .toPairRDD(nullWritableOrdering)
  .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

This would be less magical in why the definition of an implicit Ordering changes bytecode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Explicit solution is better for such tricky issue.

}

/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
// https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticed we can reuse the text array here to reduce gc. anyway that's not part of this PR - would you be willing to submit a new PR for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll send another PR for that after this one is merged.

RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}

Expand Down