diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 93fab14d66f61..85b4a81e245dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1361,6 +1361,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val RECURSIVE_FILE_LOOKUP = + buildConf("spark.sql.sources.recursiveFileLookup") + .doc("If dynamic partitioning is used to write the output of UNION or UNION ALL " + + "queries into table files with hive.execution.engine=tez, HIVE-17275 add a sub dir " + + "in table or partition location. If true, Spark will fetch all files on each location" + + " dir and sub dir. Default value is false, Spark just fetch files on each location dir") + .version("3.2.0") + .internal() + .booleanConf + .createWithDefault(false) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -4235,6 +4246,8 @@ class SQLConf extends Serializable with Logging { def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY) + def recursiveFileLookup: Boolean = getConf(SQLConf.RECURSIVE_FILE_LOOKUP) + def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED) def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 6c3deee2c3173..b0782fd3d9e9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -94,10 +94,29 @@ class InMemoryFileIndex( val files = listLeafFiles(rootPaths) cachedLeafFiles = new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f) - cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent) + cachedLeafDirToChildrenFiles = if (sparkSession.sessionState.conf.recursiveFileLookup) { + recursiveDirChildrenFiles(files) + } else { + files.toArray.groupBy(_.getPath.getParent) + } cachedPartitionSpec = null } + def recursiveDirChildrenFiles(files: mutable.LinkedHashSet[FileStatus]) + : Map[Path, Array[FileStatus]] = { + // rootPaths is table / partition location + val rootParents = rootPaths.map(_.getParent).toSet + val reorganized = new mutable.LinkedHashMap[Path, mutable.LinkedHashSet[FileStatus]]() + files.foreach { f => // f is a file, not a directory. + var parent = f.getPath.getParent + while (parent != null && !rootParents.contains(parent)) { + reorganized.getOrElseUpdate(parent, new mutable.LinkedHashSet[FileStatus]()).add(f) + parent = parent.getParent + } + } + reorganized.mapValues(_.toArray).toMap + } + override def equals(other: Any): Boolean = other match { case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet case _ => false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index fcaf8df4f9a02..7a240a7da9000 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -477,6 +477,38 @@ class FileIndexSuite extends SharedSparkSession { } } + test("recursive dir children files") { + withTempDir { dir => + val fs = new Path(dir.getPath).getFileSystem( + spark.sessionState.newHadoopConfWithOptions(Map.empty)) + val allFiles = new mutable.LinkedHashSet[FileStatus]() + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + for (i <- 1 to 8) { + val file0 = new File(partitionDirectory, i + ".txt") + stringToFile(file0, "text") + allFiles.add(fs.getFileStatus(new Path(file0.getPath))) + if (i % 2 == 0) { + val subDirectory = new File(partitionDirectory, "i=" + i) + subDirectory.mkdir() + for (ii <- 1 to 8) { + val file1 = new File(subDirectory, ii + ".txt") + stringToFile(file1, "text") + allFiles.add(fs.getFileStatus(new Path(file1.getPath))) + } + } + } + val path = fs.getFileStatus(new Path(partitionDirectory.getPath)).getPath + val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) + val files = fileIndex.listLeafFiles(Seq(path)) + val leafDirToChildrenFiles = fileIndex.recursiveDirChildrenFiles(files) + assert(leafDirToChildrenFiles.size == 5) + val actualFiles = leafDirToChildrenFiles(path) + assert(allFiles.size == actualFiles.length) + assert(actualFiles.forall(actualFiles.contains)) + } + } + test("SPARK-31047 - Improve file listing for ViewFileSystem") { val path = mock(classOf[Path]) val dfs = mock(classOf[ViewFileSystem])