Skip to content

Commit

Permalink
AL-4757 when refresh InMemoryFileIndex, can recursiveDirChildrenFiles (
Browse files Browse the repository at this point in the history
…apache#566)

* AL-4757 when refresh InMemoryFileIndex, if recursiveFileLookup is true, use recursiveDirChildrenFiles

* AL-4757 add UT
  • Loading branch information
jlf authored and zheniantoushipashi committed Aug 21, 2023
1 parent 25814af commit 8e696c8
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,17 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val RECURSIVE_FILE_LOOKUP =
buildConf("spark.sql.sources.recursiveFileLookup")
.doc("If dynamic partitioning is used to write the output of UNION or UNION ALL " +
"queries into table files with hive.execution.engine=tez, HIVE-17275 add a sub dir " +
"in table or partition location. If true, Spark will fetch all files on each location" +
" dir and sub dir. Default value is false, Spark just fetch files on each location dir")
.version("3.2.0")
.internal()
.booleanConf
.createWithDefault(false)

// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
Expand Down Expand Up @@ -4595,6 +4606,8 @@ class SQLConf extends Serializable with Logging {

def ignoreDataLocality: Boolean = getConf(SQLConf.IGNORE_DATA_LOCALITY)

def recursiveFileLookup: Boolean = getConf(SQLConf.RECURSIVE_FILE_LOOKUP)

def csvFilterPushDown: Boolean = getConf(CSV_FILTER_PUSHDOWN_ENABLED)

def jsonFilterPushDown: Boolean = getConf(JSON_FILTER_PUSHDOWN_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,29 @@ class InMemoryFileIndex(
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
cachedLeafDirToChildrenFiles = if (sparkSession.sessionState.conf.recursiveFileLookup) {
recursiveDirChildrenFiles(files)
} else {
files.toArray.groupBy(_.getPath.getParent)
}
cachedPartitionSpec = null
}

def recursiveDirChildrenFiles(files: mutable.LinkedHashSet[FileStatus])
: Map[Path, Array[FileStatus]] = {
// rootPaths is table / partition location
val rootParents = rootPaths.map(_.getParent).toSet
val reorganized = new mutable.LinkedHashMap[Path, mutable.LinkedHashSet[FileStatus]]()
files.foreach { f => // f is a file, not a directory.
var parent = f.getPath.getParent
while (parent != null && !rootParents.contains(parent)) {
reorganized.getOrElseUpdate(parent, new mutable.LinkedHashSet[FileStatus]()).add(f)
parent = parent.getParent
}
}
reorganized.mapValues(_.toArray).toMap
}

override def equals(other: Any): Boolean = other match {
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,38 @@ class FileIndexSuite extends SharedSparkSession {
}
}

test("recursive dir children files") {
withTempDir { dir =>
val fs = new Path(dir.getPath).getFileSystem(
spark.sessionState.newHadoopConfWithOptions(Map.empty))
val allFiles = new mutable.LinkedHashSet[FileStatus]()
val partitionDirectory = new File(dir, "a=foo")
partitionDirectory.mkdir()
for (i <- 1 to 8) {
val file0 = new File(partitionDirectory, i + ".txt")
stringToFile(file0, "text")
allFiles.add(fs.getFileStatus(new Path(file0.getPath)))
if (i % 2 == 0) {
val subDirectory = new File(partitionDirectory, "i=" + i)
subDirectory.mkdir()
for (ii <- 1 to 8) {
val file1 = new File(subDirectory, ii + ".txt")
stringToFile(file1, "text")
allFiles.add(fs.getFileStatus(new Path(file1.getPath)))
}
}
}
val path = fs.getFileStatus(new Path(partitionDirectory.getPath)).getPath
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
val files = fileIndex.listLeafFiles(Seq(path))
val leafDirToChildrenFiles = fileIndex.recursiveDirChildrenFiles(files)
assert(leafDirToChildrenFiles.size == 5)
val actualFiles = leafDirToChildrenFiles(path)
assert(allFiles.size == actualFiles.length)
assert(actualFiles.forall(actualFiles.contains))
}
}

test("SPARK-31047 - Improve file listing for ViewFileSystem") {
val path = mock(classOf[Path])
val dfs = mock(classOf[ViewFileSystem])
Expand Down

0 comments on commit 8e696c8

Please sign in to comment.