From 75dba0747ec43b4cc421f984297b07c0bd75b989 Mon Sep 17 00:00:00 2001 From: Mark Jarvin Date: Fri, 15 Dec 2023 16:33:17 -0500 Subject: [PATCH] [Spark] Minor changes around ExternalRDDScan operations Add a `recordFrameProfile` around some potentially-long-running Delta operations. Some Spark jobs triggered by Delta have an `ExternalRDDScan` at the root and can be very expensive in terms of runtime. This PR adds `recordFrameProfile` to give a bit of additional visibility into the runtime of these operations. Closes delta-io/delta#2380 GitOrigin-RevId: 7eba73fb2e5e7eaf45ecaea6730187116eadef43 --- .../sql/delta/commands/VacuumCommand.scala | 199 +++++++++--------- .../delta/commands/convert/ConvertUtils.scala | 30 +-- 2 files changed, 119 insertions(+), 110 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 04c6839583e..c06c605f994 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -202,119 +202,124 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { duplicates.maxBy(_.modificationTime) } - try { - allFilesAndDirs.cache() - - implicit val fileNameAndSizeEncoder = org.apache.spark.sql.Encoders.product[FileNameAndSize] - - val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path - - // The logic below is as follows: - // 1. We take all the files and directories listed in our reservoir - // 2. We filter all files older than our tombstone retention period and directories - // 3. We get the subdirectories of all files so that we can find non-empty directories - // 4. We groupBy each path, and count to get how many files are in each sub-directory - // 5. We subtract all the valid files and tombstones in our state - // 6. We filter all paths with a count of 1, which will correspond to files not in the - // state, and empty directories. We can safely delete all of these - val diff = allFilesAndDirs - .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir")) - .mapPartitions { fileStatusIterator => - val reservoirBase = new Path(basePath) - val fs = reservoirBase.getFileSystem(hadoopConf.value.value) - fileStatusIterator.flatMap { fileStatus => - if (fileStatus.isDir) { - Iterator.single(FileNameAndSize( - relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L)) - } else { - val dirs = getAllSubdirs(basePath, fileStatus.path, fs) - val dirsWithSlash = dirs.map { p => - val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true) - FileNameAndSize(relativizedPath, 0L) + recordFrameProfile("Delta", "VacuumCommand.gc") { + try { + allFilesAndDirs.cache() + + implicit val fileNameAndSizeEncoder = + org.apache.spark.sql.Encoders.product[FileNameAndSize] + + val dirCounts = allFilesAndDirs.where(col("isDir")).count() + 1 // +1 for the base path + + // The logic below is as follows: + // 1. We take all the files and directories listed in our reservoir + // 2. We filter all files older than our tombstone retention period and directories + // 3. We get the subdirectories of all files so that we can find non-empty directories + // 4. We groupBy each path, and count to get how many files are in each sub-directory + // 5. We subtract all the valid files and tombstones in our state + // 6. We filter all paths with a count of 1, which will correspond to files not in the + // state, and empty directories. We can safely delete all of these + val diff = allFilesAndDirs + .where(col("modificationTime") < deleteBeforeTimestamp || col("isDir")) + .mapPartitions { fileStatusIterator => + val reservoirBase = new Path(basePath) + val fs = reservoirBase.getFileSystem(hadoopConf.value.value) + fileStatusIterator.flatMap { fileStatus => + if (fileStatus.isDir) { + Iterator.single(FileNameAndSize( + relativize(fileStatus.getHadoopPath, fs, reservoirBase, isDir = true), 0L)) + } else { + val dirs = getAllSubdirs(basePath, fileStatus.path, fs) + val dirsWithSlash = dirs.map { p => + val relativizedPath = relativize(new Path(p), fs, reservoirBase, isDir = true) + FileNameAndSize(relativizedPath, 0L) + } + dirsWithSlash ++ Iterator( + FileNameAndSize(relativize( + fileStatus.getHadoopPath, fs, reservoirBase, isDir = false), + fileStatus.length)) } - dirsWithSlash ++ Iterator( - FileNameAndSize(relativize( - fileStatus.getHadoopPath, fs, reservoirBase, isDir = false), fileStatus.length)) } - } - }.groupBy(col("path")).agg(count(new Column("*")).as("count"), sum("length").as("length")) - .join(validFiles, Seq("path"), "leftanti") - .where(col("count") === 1) - + }.groupBy(col("path")).agg(count(new Column("*")).as("count"), + sum("length").as("length")) + .join(validFiles, Seq("path"), "leftanti") + .where(col("count") === 1) - val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first() - val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) { - 0L - } else { - sizeOfDataToDeleteRow.getLong(0) - } - val diffFiles = diff - .select(col("path")) - .as[String] - .map { relativePath => - assert(!stringToPath(relativePath).isAbsolute, - "Shouldn't have any absolute paths for deletion here.") - pathToString(DeltaFileOperations.absolutePath(basePath, relativePath)) + val sizeOfDataToDeleteRow = diff.agg(sum("length").cast("long")).first() + val sizeOfDataToDelete = if (sizeOfDataToDeleteRow.isNullAt(0)) { + 0L + } else { + sizeOfDataToDeleteRow.getLong(0) } - val timeTakenToIdentifyEligibleFiles = - System.currentTimeMillis() - startTimeToIdentifyEligibleFiles - val numFiles = diffFiles.count() - if (dryRun) { + val diffFiles = diff + .select(col("path")) + .as[String] + .map { relativePath => + assert(!stringToPath(relativePath).isAbsolute, + "Shouldn't have any absolute paths for deletion here.") + pathToString(DeltaFileOperations.absolutePath(basePath, relativePath)) + } + val timeTakenToIdentifyEligibleFiles = + System.currentTimeMillis() - startTimeToIdentifyEligibleFiles + + val numFiles = diffFiles.count() + if (dryRun) { + val stats = DeltaVacuumStats( + isDryRun = true, + specifiedRetentionMillis = retentionMillis, + defaultRetentionMillis = snapshotTombstoneRetentionMillis, + minRetainedTimestamp = deleteBeforeTimestamp, + dirsPresentBeforeDelete = dirCounts, + objectsDeleted = numFiles, + sizeOfDataToDelete = sizeOfDataToDelete, + timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, + timeTakenForDelete = 0L) + + recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) + logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " + + s"a total of $dirCounts directories that are safe to delete.") + + return diffFiles.map(f => stringToPath(f).toString).toDF("path") + } + logVacuumStart( + spark, + deltaLog, + path, + diffFiles, + sizeOfDataToDelete, + retentionMillis, + snapshotTombstoneRetentionMillis) + + val deleteStartTime = System.currentTimeMillis() + val filesDeleted = try { + delete(diffFiles, spark, basePath, + hadoopConf, parallelDeleteEnabled, parallelDeletePartitions) + } catch { + case t: Throwable => + logVacuumEnd(deltaLog, spark, path) + throw t + } + val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime val stats = DeltaVacuumStats( - isDryRun = true, + isDryRun = false, specifiedRetentionMillis = retentionMillis, defaultRetentionMillis = snapshotTombstoneRetentionMillis, minRetainedTimestamp = deleteBeforeTimestamp, dirsPresentBeforeDelete = dirCounts, - objectsDeleted = numFiles, + objectsDeleted = filesDeleted, sizeOfDataToDelete = sizeOfDataToDelete, timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, - timeTakenForDelete = 0L) - + timeTakenForDelete = timeTakenForDelete) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) - logConsole(s"Found $numFiles files ($sizeOfDataToDelete bytes) and directories in " + - s"a total of $dirCounts directories that are safe to delete.") + logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts)) - return diffFiles.map(f => stringToPath(f).toString).toDF("path") - } - logVacuumStart( - spark, - deltaLog, - path, - diffFiles, - sizeOfDataToDelete, - retentionMillis, - snapshotTombstoneRetentionMillis) - - val deleteStartTime = System.currentTimeMillis() - val filesDeleted = try { - delete(diffFiles, spark, basePath, - hadoopConf, parallelDeleteEnabled, parallelDeletePartitions) - } catch { - case t: Throwable => - logVacuumEnd(deltaLog, spark, path) - throw t + + spark.createDataset(Seq(basePath)).toDF("path") + } finally { + allFilesAndDirs.unpersist() } - val timeTakenForDelete = System.currentTimeMillis() - deleteStartTime - val stats = DeltaVacuumStats( - isDryRun = false, - specifiedRetentionMillis = retentionMillis, - defaultRetentionMillis = snapshotTombstoneRetentionMillis, - minRetainedTimestamp = deleteBeforeTimestamp, - dirsPresentBeforeDelete = dirCounts, - objectsDeleted = filesDeleted, - sizeOfDataToDelete = sizeOfDataToDelete, - timeTakenToIdentifyEligibleFiles = timeTakenToIdentifyEligibleFiles, - timeTakenForDelete = timeTakenForDelete) - recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) - logVacuumEnd(deltaLog, spark, path, Some(filesDeleted), Some(dirCounts)) - - - spark.createDataset(Seq(basePath)).toDF("path") - } finally { - allFilesAndDirs.unpersist() } } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala index 963bb04b6db..1b9adfb3344 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala @@ -274,20 +274,24 @@ trait ConvertUtilsBase extends DeltaLogging { partitionSchema: StructType, convertTargetFiles: Dataset[ConvertTargetFile]): StructType = { import org.apache.spark.sql.delta.implicits._ - val partiallyMergedSchemas = convertTargetFiles.mapPartitions { iterator => - var dataSchema: StructType = StructType(Seq()) - iterator.foreach { file => - try { - dataSchema = SchemaMergingUtils.mergeSchemas(dataSchema, - StructType.fromDDL(file.parquetSchemaDDL.get).asNullable) - } catch { - case cause: AnalysisException => - throw DeltaErrors.failedMergeSchemaFile( - file.fileStatus.path, StructType.fromDDL(file.parquetSchemaDDL.get).treeString, cause) - } + val partiallyMergedSchemas = + recordFrameProfile("Delta", "ConvertUtils.mergeSchemasInParallel") { + convertTargetFiles.mapPartitions { iterator => + var dataSchema: StructType = StructType(Seq()) + iterator.foreach { file => + try { + dataSchema = SchemaMergingUtils.mergeSchemas(dataSchema, + StructType.fromDDL(file.parquetSchemaDDL.get).asNullable) + } catch { + case cause: AnalysisException => + throw DeltaErrors.failedMergeSchemaFile( + file.fileStatus.path, StructType.fromDDL(file.parquetSchemaDDL.get).treeString, + cause) + } + } + Iterator.single(dataSchema.toDDL) + }.collect().filter(_.nonEmpty) } - Iterator.single(dataSchema.toDDL) - }.collect().filter(_.nonEmpty) if (partiallyMergedSchemas.isEmpty) { throw DeltaErrors.failedInferSchema