Skip to content

Commit

Permalink
[Spark] Minor changes around ExternalRDDScan operations
Browse files Browse the repository at this point in the history
Add a `recordFrameProfile` around some potentially-long-running Delta operations.

Some Spark jobs triggered by Delta have an `ExternalRDDScan` at the root and can be very expensive in terms of runtime.  This PR adds `recordFrameProfile` to give a bit of additional visibility into the runtime of these operations.

Closes #2380

GitOrigin-RevId: 7eba73fb2e5e7eaf45ecaea6730187116eadef43
  • Loading branch information
markj-db authored and vkorukanti committed Dec 20, 2023
1 parent e0835ba commit 75dba07
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,119 +202,124 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
duplicates.maxBy(_.modificationTime)
}

try {
allFilesAndDirs.cache()

implicit val fileNameAndSizeEncoder = org.apache.spark.sql.Encoders.product[FileNameAndSize]

val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path

// The logic below is as follows:
// 1. We take all the files and directories listed in our reservoir
// 2. We filter all files older than our tombstone retention period and directories
// 3. We get the subdirectories of all files so that we can find non-empty directories
// 4. We groupBy each path, and count to get how many files are in each sub-directory
// 5. We subtract all the valid files and tombstones in our state
// 6. We filter all paths with a count of 1, which will correspond to files not in the
// state, and empty directories. We can safely delete all of these
val diff = allFilesAndDirs
.where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
.mapPartitions { fileStatusIterator =>
val reservoirBase = new Path(basePath)
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
recordFrameProfile("Delta", "VacuumCommand.gc") {
try {
allFilesAndDirs.cache()

implicit val fileNameAndSizeEncoder =
org.apache.spark.sql.Encoders.product[FileNameAndSize]

val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path

// The logic below is as follows:
// 1. We take all the files and directories listed in our reservoir
// 2. We filter all files older than our tombstone retention period and directories
// 3. We get the subdirectories of all files so that we can find non-empty directories
// 4. We groupBy each path, and count to get how many files are in each sub-directory
// 5. We subtract all the valid files and tombstones in our state
// 6. We filter all paths with a count of 1, which will correspond to files not in the
// state, and empty directories. We can safely delete all of these
val diff = allFilesAndDirs
.where(col("modificationTime") < deleteBeforeTimestamp || col("isDir"))
.mapPartitions { fileStatusIterator =>
val reservoirBase = new Path(basePath)
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
fileStatusIterator.flatMap { fileStatus =>
if (fileStatus.isDir) {
Iterator.single(FileNameAndSize(
relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L))
} else {
val dirs = getAllSubdirs(basePath, fileStatus.path, fs)
val dirsWithSlash = dirs.map { p =>
val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true)
FileNameAndSize(relativizedPath, 0L)
}
dirsWithSlash ++ Iterator(
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false),
fileStatus.length))
}
dirsWithSlash ++ Iterator(
FileNameAndSize(relativize(
fileStatus.getHadoopPath, fs, reservoirBase, isDir = false), fileStatus.length))
}
}
}.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length"))
.join(validFiles, Seq("path"), "leftanti")
.where(col("count") === 1)

}.groupBy(col("path")).agg(count(new Column("*")).as("count"),
sum("length").as("length"))
.join(validFiles, Seq("path"), "leftanti")
.where(col("count") === 1)

val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first()
val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
0L
} else {
sizeOfDataToDeleteRow.getLong(0)
}

val diffFiles = diff
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
"Shouldn't have any absolute paths for deletion here.")
pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first()
val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) {
0L
} else {
sizeOfDataToDeleteRow.getLong(0)
}
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles

val numFiles = diffFiles.count()
if (dryRun) {
val diffFiles = diff
.select(col("path"))
.as[String]
.map { relativePath =>
assert(!stringToPath(relativePath).isAbsolute,
"Shouldn't have any absolute paths for deletion here.")
pathToString(DeltaFileOperations.absolutePath(basePath, relativePath))
}
val timeTakenToIdentifyEligibleFiles =
System.currentTimeMillis() - startTimeToIdentifyEligibleFiles

val numFiles = diffFiles.count()
if (dryRun) {
val stats = DeltaVacuumStats(
isDryRun = true,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = numFiles,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = 0L)

recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories that are safe to delete.")

return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
deltaLog,
path,
diffFiles,
sizeOfDataToDelete,
retentionMillis,
snapshotTombstoneRetentionMillis)

val deleteStartTime = System.currentTimeMillis()
val filesDeleted = try {
delete(diffFiles, spark, basePath,
hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
} catch {
case t: Throwable =>
logVacuumEnd(deltaLog, spark, path)
throw t
}
val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
val stats = DeltaVacuumStats(
isDryRun = true,
isDryRun = false,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = numFiles,
objectsDeleted = filesDeleted,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = 0L)

timeTakenForDelete = timeTakenForDelete)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " +
s"a total of $dirCounts directories that are safe to delete.")
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))

return diffFiles.map(f => stringToPath(f).toString).toDF("path")
}
logVacuumStart(
spark,
deltaLog,
path,
diffFiles,
sizeOfDataToDelete,
retentionMillis,
snapshotTombstoneRetentionMillis)

val deleteStartTime = System.currentTimeMillis()
val filesDeleted = try {
delete(diffFiles, spark, basePath,
hadoopConf, parallelDeleteEnabled, parallelDeletePartitions)
} catch {
case t: Throwable =>
logVacuumEnd(deltaLog, spark, path)
throw t

spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()
}
val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime
val stats = DeltaVacuumStats(
isDryRun = false,
specifiedRetentionMillis = retentionMillis,
defaultRetentionMillis = snapshotTombstoneRetentionMillis,
minRetainedTimestamp = deleteBeforeTimestamp,
dirsPresentBeforeDelete = dirCounts,
objectsDeleted = filesDeleted,
sizeOfDataToDelete = sizeOfDataToDelete,
timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles,
timeTakenForDelete = timeTakenForDelete)
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts))


spark.createDataset(Seq(basePath)).toDF("path")
} finally {
allFilesAndDirs.unpersist()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,20 +274,24 @@ trait ConvertUtilsBase extends DeltaLogging {
partitionSchema: StructType,
convertTargetFiles: Dataset[ConvertTargetFile]): StructType = {
import org.apache.spark.sql.delta.implicits._
val partiallyMergedSchemas = convertTargetFiles.mapPartitions { iterator =>
var dataSchema: StructType = StructType(Seq())
iterator.foreach { file =>
try {
dataSchema = SchemaMergingUtils.mergeSchemas(dataSchema,
StructType.fromDDL(file.parquetSchemaDDL.get).asNullable)
} catch {
case cause: AnalysisException =>
throw DeltaErrors.failedMergeSchemaFile(
file.fileStatus.path, StructType.fromDDL(file.parquetSchemaDDL.get).treeString, cause)
}
val partiallyMergedSchemas =
recordFrameProfile("Delta", "ConvertUtils.mergeSchemasInParallel") {
convertTargetFiles.mapPartitions { iterator =>
var dataSchema: StructType = StructType(Seq())
iterator.foreach { file =>
try {
dataSchema = SchemaMergingUtils.mergeSchemas(dataSchema,
StructType.fromDDL(file.parquetSchemaDDL.get).asNullable)
} catch {
case cause: AnalysisException =>
throw DeltaErrors.failedMergeSchemaFile(
file.fileStatus.path, StructType.fromDDL(file.parquetSchemaDDL.get).treeString,
cause)
}
}
Iterator.single(dataSchema.toDDL)
}.collect().filter(_.nonEmpty)
}
Iterator.single(dataSchema.toDDL)
}.collect().filter(_.nonEmpty)

if (partiallyMergedSchemas.isEmpty) {
throw DeltaErrors.failedInferSchema
Expand Down

0 comments on commit 75dba07

Please sign in to comment.