From 669368b17172d3a3abb913d7591ebdad9e7219f9 Mon Sep 17 00:00:00 2001 From: yumwang Date: Wed, 13 Apr 2022 11:19:07 +0800 Subject: [PATCH] [CARMEL-5931] Avoid file number exceeds spark.sql.dynamic.partition.maxCreatedFiles(#908) --- .../adaptive/OptimizeLocalShuffleReader.scala | 4 +++- .../adaptive/AdaptiveQueryExecSuite.scala | 20 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index c18c4dd705422..7e9aec1c18c8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -163,8 +163,10 @@ object OptimizeLocalShuffleReader extends Rule[SparkPlan] { case ENSURE_REQUIREMENTS => s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) case REPARTITION_BY_NONE => + val partitionNums = s.mapStats.map(_.bytesByPartitionId.length) // Use LocalShuffleReader only when we can't CoalesceShufflePartitions - s.mapStats.exists(_.bytesByPartitionId.length == partitionSpecs.size) && + partitionNums.contains(partitionSpecs.size) && + partitionNums.forall(_ < conf.maxCreatedFilesInDynamicPartition / 2) && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8347ac2b4c2f6..aae1e17b4e0f4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2177,4 +2177,24 @@ class AdaptiveQueryExecSuite } } + test("CARMEL-5931: Avoid file number exceeds spark.sql.dynamic.partition.maxCreatedFiles") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "5", + SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", + SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key -> "10", + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "1") { + val query = "SELECT /*+ REPARTITION */ * FROM range(1, 100, 1, 100)" + val (_, adaptivePlan) = runAdaptiveAndVerifyResult(query) + collect(adaptivePlan) { + case r: CustomShuffleReaderExec => r + } match { + case Seq(customShuffleReader) => + assert(customShuffleReader.partitionSpecs.size === 5) + assert(!customShuffleReader.isLocalReader) + case _ => + fail("There should be a CustomShuffleReaderExec") + } + } + } }