Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2075][Core] Make the compiler generate same bytes code for Hadoop 1.+ and Hadoop 2.+ #3740

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

Expand Down Expand Up @@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging {
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
statisticsDataClass.getDeclaredMethod(methodName)
}

/**
* Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly
* call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes
* for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+
* while it's interface in Hadoop 2.+.
*/
def getConfigurationFromJobContext(context: JobContext): Configuration = {
val method = context.getClass.getMethod("getConfiguration")
method.invoke(context).asInstanceOf[Configuration]
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
import org.apache.spark.deploy.SparkHadoopUtil

/**
* Custom Input Format for reading and splitting flat binary files that contain records,
Expand All @@ -33,7 +34,8 @@ private[spark] object FixedLengthBinaryInputFormat {

/** Retrieves the record length property from a Hadoop configuration */
def getRecordLength(context: JobContext): Int = {
context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to indent here - i can fix it when i commit

get(RECORD_LENGTH_PROPERTY).toInt
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.io.{BytesWritable, LongWritable}
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.spark.deploy.SparkHadoopUtil

/**
* FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat.
Expand Down Expand Up @@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader
// the actual file we will be reading from
val file = fileSplit.getPath
// job configuration
val job = context.getConfiguration
val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
// check compression
val codec = new CompressionCodecFactory(job).getCodec(file)
if (codec != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.input

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import org.apache.spark.deploy.SparkHadoopUtil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import is out of order here , but i can fix it when i commit this.


import scala.collection.JavaConversions._

import com.google.common.io.ByteStreams
Expand Down Expand Up @@ -145,7 +147,8 @@ class PortableDataStream(

private val confBytes = {
val baos = new ByteArrayOutputStream()
context.getConfiguration.write(new DataOutputStream(baos))
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
write(new DataOutputStream(baos))
baos.toByteArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.deploy.SparkHadoopUtil


/**
Expand All @@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader(
extends RecordReader[String, String] with Configurable {

private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
private[this] val fs = path.getFileSystem(
SparkHadoopUtil.get.getConfigurationFromJobContext(context))

// True means the current file has been processed, then skip it.
private[this] var processed = false
Expand Down
23 changes: 21 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1174,15 +1174,34 @@ abstract class RDD[T: ClassTag](
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
// https://issues.apache.org/jira/browse/SPARK-2075
// NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+,
// so the compiler cannot find an implicit Ordering for it and will use the default `null`.
// It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and
// Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler
// will generate same bytecode.
val nullWritableOrdering = new Ordering[NullWritable] {
override def compare(x: NullWritable, y: NullWritable): Int = 0
}
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here can be reused too

RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just pass null in for nullWritableOrdering, can't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can, since Ordering won't be used.

.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the problem here that while compiling Hadoop 2, the compiler chooses to specify the Ordering on the implicit rddToPairRDDFunctions, while in Hadoop 1 it instead uses the default method (return null) to invoke the implicit?

I wonder if a more explicit solution, like the introduction of a conversion to PairRDDFunctions which takes an Ordering, is warranted for these cases. e.g.:

this.map(x => (NullWritable.get(), new Text(x.toString)))
  .toPairRDD(nullWritableOrdering)
  .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

This would be less magical in why the definition of an implicit Ordering changes bytecode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Explicit solution is better for such tricky issue.

}

/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
// https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableOrdering = new Ordering[NullWritable] {
override def compare(x: NullWritable, y: NullWritable): Int = 0
}
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticed we can reuse the text array here to reduce gc. anyway that's not part of this PR - would you be willing to submit a new PR for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll send another PR for that after this one is merged.

RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
}

Expand Down