From ab458d8bec3e682211a7aca07626c3ba0e5148db Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Wed, 19 Feb 2025 15:15:16 -0700 Subject: [PATCH 1/9] - fix auto compaction to calculate the correct default minFileSize if the session config is unset. this resulted in large files counting torwards minNumFiles - auto compaction should always evaluate if there are sufficient small files, the prior code use an `OR` condition if auto compaction hadn't been run, this resulted in AC being run too frequently. Signed-off-by: Miles Cole --- .../sql/delta/OptimisticTransaction.scala | 4 +- .../stats/AutoCompactPartitionStats.scala | 8 ++-- .../spark/sql/delta/AutoCompactSuite.scala | 43 +++++++++++++------ 3 files changed, 38 insertions(+), 17 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index af57cbaac9..f089eb1d28 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1108,9 +1108,11 @@ trait OptimisticTransactionImpl extends DeltaTransaction def createAutoCompactStatsCollector(): AutoCompactPartitionStatsCollector = { try { if (spark.conf.get(DeltaSQLConf.DELTA_AUTO_COMPACT_RECORD_PARTITION_STATS_ENABLED)) { + val maxFileSize = spark.conf + .get(DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE) val minFileSize = spark.conf .get(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_FILE_SIZE) - .getOrElse(Long.MaxValue) + .getOrElse(maxFileSize / 2L) return AutoCompactPartitionStats.instance(spark) .createStatsCollector(minFileSize, reportAutoCompactStatsError) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala index d9e24d89a4..04f6031ed7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala @@ -69,13 +69,13 @@ class AutoCompactPartitionStats( var wasAutoCompacted: Boolean = false) { /** - * Determine whether this partition can be autocompacted based on the number of small files or + * Determine whether this partition can be autocompacted based on the number of small files and * if this [[AutoCompactPartitionStats]] instance has not auto compacted it yet. * @param minNumFiles The minimum number of files this table-partition should have to trigger * Auto Compaction in case it has already been compacted once. */ - def hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles: Long): Boolean = - !wasAutoCompacted || hasSufficientFiles(minNumFiles) + def hasSufficientSmallFilesAndHasNotBeenCompacted(minNumFiles: Long): Boolean = + !wasAutoCompacted && hasSufficientFiles(minNumFiles) def hasSufficientFiles(minNumFiles: Long): Boolean = numFiles >= minNumFiles } @@ -305,7 +305,7 @@ class AutoCompactPartitionStats( tablePartitionStatsCache.get(tableId).map { tablePartitionStates => targetPartitions.filter { partitionKey => tablePartitionStates.get(partitionKey.##).exists { partitionState => - partitionState.hasSufficientSmallFilesOrHasNotBeenCompacted(minNumFiles) + partitionState.hasSufficientSmallFilesAndHasNotBeenCompacted(minNumFiles) } } }.getOrElse(Set.empty) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index dbdab26855..edc79ee9ca 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -283,34 +283,53 @@ class AutoCompactSuite extends } testBothModesViaProperty("auto compact should not kick in when there aren't " + - "enough files") { dir => - withSQLConf(DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "5") { + "enough small files") { dir => + withSQLConf( + DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "6", + DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "3800b" + ) { AutoCompactPartitionStats.instance(spark).resetTestOnly() - spark.range(10).repartition(4).write.format("delta").mode("append").save(dir) + // First write - 4 small files + spark.range(10).repartition(4).write.format("delta").mode("append").save(dir) val deltaLog = DeltaLog.forTable(spark, dir) val newSnapshot = deltaLog.update() assert(newSnapshot.version === 0) - assert(deltaLog.update().numOfFiles === 4) + assert(deltaLog.update().numOfFiles === 4, "Should have 4 initial small files") + // Second write - 4 large files val isLogged2 = checkAutoOptimizeLogging { - spark.range(10).repartition(4).write.format("delta").mode("append").save(dir) + spark.range(1000).repartition(4).write.format("delta").mode("append").save(dir) } - assert(isLogged2) - val lastEvent = deltaLog.history.getHistory(Some(1)).head - assert(lastEvent.operation === "OPTIMIZE") - assert(lastEvent.operationParameters("auto") === "true") - - assert(deltaLog.update().numOfFiles === 1, "Files should be optimized into a single one") + val writeEvent = deltaLog.history.getHistory(Some(1)).head + assert(writeEvent.operation === "WRITE", + "Large files shouldn't trigger auto compaction") + assert(deltaLog.update().numOfFiles === 8, + "Should have 4 small + 4 large files") + + // Third write - 2 more small files to reach minNumFiles + spark.range(10).repartition(2).write.format("delta").mode("append").save(dir) + val compactionEvent = deltaLog.history.getHistory(Some(3)).head + assert(compactionEvent.operation === "OPTIMIZE", + "Should trigger compaction with 6 small files") + assert(compactionEvent.operationParameters("auto") === "true") + + val finalSnapshot = deltaLog.update() + assert(finalSnapshot.numOfFiles === 5, + "Should have 4 large files + 1 compacted small file") checkAnswer( spark.read.format("delta").load(dir), - spark.range(10).union(spark.range(10)).toDF() + spark.range(10) + .union(spark.range(1000)) + .union(spark.range(10)) + .toDF() ) } } + testBothModesViaProperty("ensure no NPE in auto compact UDF with null " + "partition values") { dir => Seq(null, "", " ").zipWithIndex.foreach { case (partValue, i) => From 10fe507a02f45749c2156ee14f23c38a67670a41 Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Wed, 19 Feb 2025 17:58:37 -0700 Subject: [PATCH 2/9] fix test suite --- .../apache/spark/sql/delta/AutoCompactSuite.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index edc79ee9ca..26a5556778 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -152,12 +152,13 @@ class AutoCompactSuite extends DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> s"true", DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "30") { val path = dir.getCanonicalPath - // Append 1 file to each partition: record runOnModifiedPartitions event, as is first write + // Append 1 file to each partition: record skipInsufficientFilesInModifiedPartitions event, + // as not enough small files exist var usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) { createFilesToPartitions(numFilePartitions = 3, numFilesPerPartition = 1, path) } var log = JsonUtils.mapper.readValue[Map[String, String]](usageLogs.head.blob) - assert(log("status") == "runOnModifiedPartitions" && log("partitions") == "3") + assert(log("status") == "skipInsufficientFilesInModifiedPartitions") // Append 10 more file to each partition: record skipInsufficientFilesInModifiedPartitions // event. usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) { @@ -196,8 +197,9 @@ class AutoCompactSuite extends df.write.format("delta").mode("append").save(dir) val deltaLog = DeltaLog.forTable(spark, dir) val newSnapshot = deltaLog.update() - assert(newSnapshot.version === 1) // 0 is the first commit, 1 is optimize - assert(deltaLog.update().numOfFiles === 1) + assert(newSnapshot.version === 0) // 0 is the first commit, no compaction + // due to the count of small files + assert(deltaLog.update().numOfFiles > 1) val isLogged = checkAutoOptimizeLogging { df.write.format("delta").mode("append").save(dir) @@ -286,7 +288,7 @@ class AutoCompactSuite extends "enough small files") { dir => withSQLConf( DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "6", - DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "3800b" + DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "3800" ) { AutoCompactPartitionStats.instance(spark).resetTestOnly() From c915b792cb05cd8c029651c5ab04caa381ee4236 Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Wed, 19 Feb 2025 18:06:02 -0700 Subject: [PATCH 3/9] fix test suite Signed-off-by: Miles Cole --- .../org/apache/spark/sql/delta/AutoCompactSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index 26a5556778..e067752962 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -300,10 +300,8 @@ class AutoCompactSuite extends assert(deltaLog.update().numOfFiles === 4, "Should have 4 initial small files") // Second write - 4 large files - val isLogged2 = checkAutoOptimizeLogging { - spark.range(1000).repartition(4).write.format("delta").mode("append").save(dir) - } - assert(isLogged2) + spark.range(1000).repartition(4).write.format("delta").mode("append").save(dir) + val writeEvent = deltaLog.history.getHistory(Some(1)).head assert(writeEvent.operation === "WRITE", "Large files shouldn't trigger auto compaction") @@ -311,7 +309,10 @@ class AutoCompactSuite extends "Should have 4 small + 4 large files") // Third write - 2 more small files to reach minNumFiles - spark.range(10).repartition(2).write.format("delta").mode("append").save(dir) + val isLogged2 = checkAutoOptimizeLogging { + spark.range(10).repartition(2).write.format("delta").mode("append").save(dir) + } + assert(isLogged2) val compactionEvent = deltaLog.history.getHistory(Some(3)).head assert(compactionEvent.operation === "OPTIMIZE", "Should trigger compaction with 6 small files") From 4d32201b2a7b84aac379af5bc381a606745b6b10 Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Wed, 19 Feb 2025 21:26:57 -0700 Subject: [PATCH 4/9] remove whitespace in test suite Signed-off-by: Miles Cole --- .../scala/org/apache/spark/sql/delta/AutoCompactSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index e067752962..d8cc6fe44f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -152,7 +152,7 @@ class AutoCompactSuite extends DeltaSQLConf.DELTA_AUTO_COMPACT_ENABLED.key -> s"true", DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "30") { val path = dir.getCanonicalPath - // Append 1 file to each partition: record skipInsufficientFilesInModifiedPartitions event, + // Append 1 file to each partition: record skipInsufficientFilesInModifiedPartitions event, // as not enough small files exist var usageLogs = captureOptimizeLogs(AutoCompact.OP_TYPE) { createFilesToPartitions(numFilePartitions = 3, numFilesPerPartition = 1, path) From 1fbcf9ab4fb71016aae6024b3d6e982f6a3c535f Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Wed, 19 Feb 2025 21:51:15 -0700 Subject: [PATCH 5/9] fix maxFileSize for test Signed-off-by: Miles Cole --- .../scala/org/apache/spark/sql/delta/AutoCompactSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index d8cc6fe44f..5e65b45664 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -288,7 +288,7 @@ class AutoCompactSuite extends "enough small files") { dir => withSQLConf( DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "6", - DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "3800" + DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "2000" ) { AutoCompactPartitionStats.instance(spark).resetTestOnly() From 9552af8d926debb003c3e80ff74be605c254177b Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Wed, 19 Feb 2025 21:53:01 -0700 Subject: [PATCH 6/9] fix test case --- .../scala/org/apache/spark/sql/delta/AutoCompactSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index 5e65b45664..c82b37c999 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -197,9 +197,8 @@ class AutoCompactSuite extends df.write.format("delta").mode("append").save(dir) val deltaLog = DeltaLog.forTable(spark, dir) val newSnapshot = deltaLog.update() - assert(newSnapshot.version === 0) // 0 is the first commit, no compaction - // due to the count of small files - assert(deltaLog.update().numOfFiles > 1) + assert(newSnapshot.version === 1) + assert(deltaLog.update().numOfFiles === 1) val isLogged = checkAutoOptimizeLogging { df.write.format("delta").mode("append").save(dir) From ff4d0698cc48ddab873a06366d569ff544fba5a9 Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Thu, 20 Feb 2025 16:01:14 -0700 Subject: [PATCH 7/9] only check based on number of small files --- .../spark/sql/delta/stats/AutoCompactPartitionStats.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala index 04f6031ed7..6f316036bc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala @@ -75,7 +75,7 @@ class AutoCompactPartitionStats( * Auto Compaction in case it has already been compacted once. */ def hasSufficientSmallFilesAndHasNotBeenCompacted(minNumFiles: Long): Boolean = - !wasAutoCompacted && hasSufficientFiles(minNumFiles) + hasSufficientFiles(minNumFiles) def hasSufficientFiles(minNumFiles: Long): Boolean = numFiles >= minNumFiles } From 929359ab7106e03690f345798c9682871c8fbec3 Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Thu, 20 Feb 2025 16:01:22 -0700 Subject: [PATCH 8/9] fix test case --- .../scala/org/apache/spark/sql/delta/AutoCompactSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala index c82b37c999..1f246b0153 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/AutoCompactSuite.scala @@ -287,7 +287,7 @@ class AutoCompactSuite extends "enough small files") { dir => withSQLConf( DeltaSQLConf.DELTA_AUTO_COMPACT_MIN_NUM_FILES.key -> "6", - DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "2000" + DeltaSQLConf.DELTA_AUTO_COMPACT_MAX_FILE_SIZE.key -> "20000" ) { AutoCompactPartitionStats.instance(spark).resetTestOnly() @@ -299,7 +299,7 @@ class AutoCompactSuite extends assert(deltaLog.update().numOfFiles === 4, "Should have 4 initial small files") // Second write - 4 large files - spark.range(1000).repartition(4).write.format("delta").mode("append").save(dir) + spark.range(10000).repartition(4).write.format("delta").mode("append").save(dir) val writeEvent = deltaLog.history.getHistory(Some(1)).head assert(writeEvent.operation === "WRITE", From 0058abeb55b9c82d9a325b7ded172b12be6f4ad4 Mon Sep 17 00:00:00 2001 From: Miles Cole Date: Thu, 20 Feb 2025 16:03:03 -0700 Subject: [PATCH 9/9] update description to match code Signed-off-by: Miles Cole --- .../spark/sql/delta/stats/AutoCompactPartitionStats.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala index 6f316036bc..955916de5c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/AutoCompactPartitionStats.scala @@ -69,8 +69,7 @@ class AutoCompactPartitionStats( var wasAutoCompacted: Boolean = false) { /** - * Determine whether this partition can be autocompacted based on the number of small files and - * if this [[AutoCompactPartitionStats]] instance has not auto compacted it yet. + * Determine whether this partition can be autocompacted based on the number of small files. * @param minNumFiles The minimum number of files this table-partition should have to trigger * Auto Compaction in case it has already been compacted once. */