Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11678] [SQL] Partition discovery should stop at the root path of the table. #9651

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String,
typeInference: Boolean): PartitionSpec = {
typeInference: Boolean,
basePaths: Set[Path]): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val (partitionValues, optBasePaths) = paths.map { path =>
parsePartition(path, defaultPartitionName, typeInference)
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
parsePartition(path, defaultPartitionName, typeInference, basePaths)
}.unzip

// We create pairs of (path -> path's partition value) here
Expand All @@ -101,11 +102,15 @@ private[sql] object PartitioningUtils {
// It will be recognised as conflicting directory structure:
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
val basePaths = optBasePaths.flatMap(x => x)
val disvoeredBasePaths = optDiscoveredBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
disvoeredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
disvoeredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
"If provided paths are partition directories, please set " +
"\"basePath\" in the options of the data source to specify the " +
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")

val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)

Expand All @@ -131,7 +136,7 @@ private[sql] object PartitioningUtils {

/**
* Parses a single partition, returns column names and values of each partition column, also
* the base path. For example, given:
* the path when we stop partition discovery. For example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
Expand All @@ -144,40 +149,63 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
* and the base path:
* and the path when we stop the discovery is:
* {{{
* /path/to/partition
* hdfs://<host>:<port>/path/to/partition
* }}}
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String,
typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
typeInference: Boolean,
basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
var chopped = path
var basePath = path
// currentPath is the current path that we will use to parse partition column value.
var currentPath: Path = path

while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName.toLowerCase == "_temporary") {
// uncleaned. Here we simply ignore them.
if (currentPath.getName.toLowerCase == "_temporary") {
return (None, None)
}

val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)
basePath = chopped
chopped = chopped.getParent
finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
if (basePaths.contains(currentPath)) {
// If the currentPath is one of base paths. We should stop.
finished = true
} else {
// Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
// Once we get the string, we try to parse it and find the partition column and value.
val maybeColumn =
parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)

// Now, we determine if we should stop.
// When we hit any of the following cases, we will stop:
// - In this iteration, we could not parse the value of partition column and value,
// i.e. maybeColumn is None, and columns is not empty. At here we check if columns is
// empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in
// this case).
// - After we get the new currentPath, this new currentPath represent the top level dir
// i.e. currentPath.getParent == null. For the example of "/table/a=1/",
// the top level dir is "/table".
finished =
(maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null

if (!finished) {
// For the above example, currentPath will be "/table/".
currentPath = currentPath.getParent
}
}
}

if (columns.isEmpty) {
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
(Some(PartitionValues(columnNames, values)), Some(basePath))
(Some(PartitionValues(columnNames, values)), Some(currentPath))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)

new JSONRelation(
None,
samplingRatio,
primitivesAsString,
dataSchema,
None,
partitionColumns,
paths)(sqlContext)
inputRDD = None,
samplingRatio = samplingRatio,
primitivesAsString = primitivesAsString,
maybeDataSchema = dataSchema,
maybePartitionSpec = None,
userDefinedPartitionColumns = partitionColumns,
paths = paths,
parameters = parameters)(sqlContext)
}
}

Expand All @@ -73,8 +74,10 @@ private[sql] class JSONRelation(
val maybeDataSchema: Option[StructType],
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
override val paths: Array[String] = Array.empty[String])(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec) {
override val paths: Array[String] = Array.empty[String],
parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec, parameters) {

/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private[sql] class ParquetRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {

private[sql] def this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
private[sql] class TextRelation(
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
override val paths: Array[String] = Array.empty[String])
override val paths: Array[String] = Array.empty[String],
parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec) {
extends HadoopFsRelation(maybePartitionSpec, parameters) {

/** Data schema is always a single column, named "text". */
override def dataSchema: StructType = new StructType().add("value", StringType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,19 @@ abstract class OutputWriter {
* @since 1.4.0
*/
@Experimental
abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
abstract class HadoopFsRelation private[sql](
maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {

override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")

def this() = this(None)
def this() = this(None, Map.empty[String, String])

def this(parameters: Map[String, String]) = this(None, parameters)

private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) =
this(maybePartitionSpec, Map.empty[String, String])

private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)

Expand Down Expand Up @@ -519,13 +526,37 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}

/**
* Base paths of this relation. For partitioned relations, it should be either root directories
* Paths of this relation. For partitioned relations, it should be root directories
* of all partition directories.
*
* @since 1.4.0
*/
def paths: Array[String]

/**
* Contains a set of paths that are considered as the base dirs of the input datasets.
* The partitioning discovery logic will make sure it will stop when it reaches any
* base path. By default, the paths of the dataset provided by users will be base paths.
* For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
* will be `/path/something=true/`, and the returned DataFrame will not contain a column of
* `something`. If users want to override the basePath. They can set `basePath` in the options
* to pass the new base path to the data source.
* For the above example, if the user-provided base path is `/path/`, the returned
* DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
userDefinedBasePath.getOrElse {
// If the user does not provide basePath, we will just use paths.
val pathSet = paths.toSet
pathSet.map(p => new Path(p))
}.map { hdfsPath =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering parsePartitions asserts that basePaths.distinct.size == 1, users should either provide a basePath or passing only a single input path, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, actually the basePaths in parsePartitions is completely something else with the same name.

}

override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray

override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum
Expand Down Expand Up @@ -559,7 +590,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
userDefinedPartitionColumns match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = false,
basePaths = basePaths)

// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
Expand All @@ -577,8 +611,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio

case _ =>
// user did not provide a partitioning schema
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(),
basePaths = basePaths)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("part = 1"),
sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need getCanonicalPath here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path is a partition dir and if we load that single dir, I am not sure we should attach part as a column to your table.

(1 to 3).map(i => Row(i, i.toString, 1)))
}
}
Expand All @@ -311,7 +311,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
Expand Down
Loading