Skip to content

Commit

Permalink
[SPARK-51185][CORE] Revert simplifications to PartitionedFileUtil API…
Browse files Browse the repository at this point in the history
… to reduce memory requirements

### What changes were proposed in this pull request?

This PR reverts an earlier change (apache#41632) that converted FileStatusWithMetadata.getPath from a def to a lazy val in order to simplify the PartitionedFileUtils helpers.

### Why are the changes needed?

The conversion of getPath from a def to a lazy val increases the memory requirements because now paths need to be kept in memory as long as the FileStatusWithMetadata exists. As paths are expensive to store, this can lead to higher memory utilization and increase the risk for OOMs.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

This is a small revert to code that has already existed before so the existing tests are sufficient.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#49915 from LukasRupprecht/def_get-path.

Authored-by: Lukas Rupprecht <lukas.l.rupprecht@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
LukasRupprecht authored and cloud-fan committed Feb 13, 2025
1 parent 97372e0 commit 74d88b6
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,9 @@ trait FileSourceScanLike extends DataSourceScanExec {

override def toPartitionArray: Array[PartitionedFile] = {
partitionDirectories.flatMap { p =>
p.files.map { f => PartitionedFileUtil.getPartitionedFile(f, p.values, 0, f.getLen) }
p.files.map { f =>
PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen)
}
}
}

Expand Down Expand Up @@ -789,11 +791,14 @@ case class FileSourceScanExec(
val splitFiles = selectedPartitions.filePartitionIterator.flatMap { partition =>
val ListingPartition(partitionVals, _, fileStatusIterator) = partition
fileStatusIterator.flatMap { file =>
if (shouldProcess(file.getPath)) {
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, file.getPath)
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partitionVals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -26,27 +26,29 @@ import org.apache.spark.sql.execution.datasources._
object PartitionedFileUtil {
def splitFiles(
file: FileStatusWithMetadata,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
if (isSplitable) {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
getPartitionedFile(file, partitionValues, offset, size)
getPartitionedFile(file, filePath, partitionValues, offset, size)
}
} else {
Seq(getPartitionedFile(file, partitionValues, 0, file.getLen))
Seq(getPartitionedFile(file, filePath, partitionValues, 0, file.getLen))
}
}

def getPartitionedFile(
file: FileStatusWithMetadata,
filePath: Path,
partitionValues: InternalRow,
start: Long,
length: Long): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length)
PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), start, length, hosts,
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), start, length, hosts,
file.getModificationTime, file.getLen, file.metadata)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import org.apache.spark.util.ArrayImplicits._
*/
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
// Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
// NOTE: getPath() is very expensive, so we only want to call it once (if accessed at all).
lazy val getPath: Path = fileStatus.getPath
def getPath: Path = fileStatus.getPath
def getLen: Long = fileStatus.getLen
def getModificationTime: Long = fileStatus.getModificationTime
def isDirectory: Boolean = fileStatus.isDirectory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ trait FileScan extends Scan
partition.values
}
partition.files.flatMap { file =>
val filePath = file.getPath
PartitionedFileUtil.splitFiles(
file = file,
isSplitable = isSplitable(file.getPath),
filePath = filePath,
isSplitable = isSplitable(filePath),
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues
)
Expand Down

0 comments on commit 74d88b6

Please sign in to comment.