Skip to content

Commit

Permalink
[SPARK-14596][SQL] Remove not used SqlNewHadoopRDD and some more unus…
Browse files Browse the repository at this point in the history
…ed imports

## What changes were proposed in this pull request?

Old `HadoopFsRelation` API includes `buildInternalScan()` which uses `SqlNewHadoopRDD` in `ParquetRelation`.
Because now the old API is removed, `SqlNewHadoopRDD` is not used anymore.

So, this PR removes `SqlNewHadoopRDD` and several unused imports.

This was discussed in #12326.

## How was this patch tested?

Several related existing unit tests and `sbt scalastyle`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #12354 from HyukjinKwon/SPARK-14596.
  • Loading branch information
HyukjinKwon authored and cloud-fan committed Apr 14, 2016
1 parent 62b7f30 commit b481940
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 310 deletions.
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,13 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

// TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD

val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
val existingBytesRead = inputMetrics.bytesRead

// Sets the thread local variable for the file's name
split.inputSplit.value match {
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString)
case _ => InputFileNameHolder.unsetInputFileName()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
Expand Down Expand Up @@ -271,7 +269,7 @@ class HadoopRDD[K, V](

override def close() {
if (reader != null) {
SqlNewHadoopRDDState.unsetInputFileName()
InputFileNameHolder.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package org.apache.spark.rdd
import org.apache.spark.unsafe.types.UTF8String

/**
* State for SqlNewHadoopRDD objects. This is split this way because of the package splits.
* TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD
* This holds file names of the current Spark task. This is used in HadoopRDD,
* FileScanRDD and InputFileName function in Spark SQL.
*/
private[spark] object SqlNewHadoopRDDState {
private[spark] object InputFileNameHolder {
/**
* The thread variable for the name of the current file being read. This is used by
* the InputFileName function in Spark SQL.
Expand Down
5 changes: 0 additions & 5 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
Expand All @@ -856,10 +855,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
Expand All @@ -870,7 +867,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
Expand All @@ -884,7 +880,6 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.rdd.SqlNewHadoopRDDState
import org.apache.spark.rdd.InputFileNameHolder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

/**
* Expression that returns the name of the current file being read in using [[SqlNewHadoopRDD]]
* Expression that returns the name of the current file being read.
*/
@ExpressionDescription(
usage = "_FUNC_() - Returns the name of the current file being read if available",
Expand All @@ -40,12 +40,12 @@ case class InputFileName() extends LeafExpression with Nondeterministic {
override protected def initInternal(): Unit = {}

override protected def evalInternal(input: InternalRow): UTF8String = {
SqlNewHadoopRDDState.getInputFileName()
InputFileNameHolder.getInputFileName()
}

override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
ev.isNull = "false"
s"final ${ctx.javaType(dataType)} ${ev.value} = " +
"org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
"org.apache.spark.rdd.InputFileNameHolder.getInputFileName();"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources

import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow

Expand All @@ -37,7 +37,6 @@ case class PartitionedFile(
}
}


/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
Expand All @@ -50,7 +49,7 @@ class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
Expand All @@ -65,17 +64,17 @@ class FileScanRDD(
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
InputFileNameHolder.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
} else {
SqlNewHadoopRDDState.unsetInputFileName()
InputFileNameHolder.unsetInputFileName()
false
}
}

override def close() = {
SqlNewHadoopRDDState.unsetInputFileName()
InputFileNameHolder.unsetInputFileName()
}
}

Expand Down
Loading

0 comments on commit b481940

Please sign in to comment.