diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 4766c77909158..3512fe34e92a7 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -625,6 +625,17 @@ ], "sqlState" : "40000" }, + "CONFLICTING_PARTITION_COLUMN_NAMES" : { + "message" : [ + "Conflicting partition column names detected:", + "", + "For partitioned table directories, data files should only live in leaf directories.", + "And directories at the same level should have the same partition column name.", + "Please check the following directories for unexpected files or inconsistent partition column names:", + "" + ], + "sqlState" : "KD009" + }, "CONNECT" : { "message" : [ "Generic Spark Connect error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 9bfb81ad821be..eb25387af5a72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2837,4 +2837,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE "parameter" -> toSQLId("unit"), "invalidValue" -> s"'$invalidValue'")) } + + def conflictingPartitionColumnNamesError( + distinctPartColLists: Seq[String], + suspiciousPaths: Seq[Path]): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + messageParameters = Map( + "distinctPartColLists" -> distinctPartColLists.mkString("\n\t", "\n\t", "\n"), + "suspiciousPaths" -> suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + ) + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 3b2d601b81fb5..676a2ab64d0a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path +import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -386,9 +387,9 @@ object PartitioningUtils extends SQLConfHelper { } else { pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) } - assert( - partColNames.distinct.size == 1, - listConflictingPartitionColumns(pathsWithPartitionValues)) + if (partColNames.distinct.size != 1) { + throw conflictingPartitionColumnsError(pathsWithPartitionValues) + } // Resolves possible type conflicts for each column val values = pathsWithPartitionValues.map(_._2) @@ -404,8 +405,8 @@ object PartitioningUtils extends SQLConfHelper { } } - private[datasources] def listConflictingPartitionColumns( - pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { + private[datasources] def conflictingPartitionColumnsError( + pathWithPartitionValues: Seq[(Path, PartitionValues)]): SparkRuntimeException = { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = @@ -423,13 +424,8 @@ object PartitioningUtils extends SQLConfHelper { // Lists out those non-leaf partition directories that also contain files val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths) - s"Conflicting partition column names detected:\n" + - distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + - "For partitioned table directories, data files should only live in leaf directories.\n" + - "And directories at the same level should have the same partition column name.\n" + - "Please check the following directories for unexpected files or " + - "inconsistent partition column names:\n" + - suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + QueryExecutionErrors.conflictingPartitionColumnNamesError( + distinctPartColLists, suspiciousPaths) } // scalastyle:off line.size.limit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 6399eb6da049f..21623f94c8baf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -112,7 +112,7 @@ class FileIndexSuite extends SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AssertionError] { + val msg = intercept[SparkRuntimeException] { val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) fileIndex.partitionSpec() }.getMessage diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index a6ad147c865d2..1484511a98b63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -27,7 +27,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkRuntimeException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils @@ -958,54 +958,58 @@ abstract class ParquetPartitionDiscoverySuite } } - test("listConflictingPartitionColumns") { - def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = { - val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => - s"\tPartition column name list #$index: $list" - }.mkString("\n", "\n", "\n") - - // scalastyle:off - s"""Conflicting partition column names detected: - |$conflictingColNameLists - |For partitioned table directories, data files should only live in leaf directories. - |And directories at the same level should have the same partition column name. - |Please check the following directories for unexpected files or inconsistent partition column names: - |${paths.map("\t" + _).mkString("\n", "\n", "")} - """.stripMargin.trim - // scalastyle:on - } - - assert( - listConflictingPartitionColumns( + test("conflictingPartitionColumnsError") { + checkError( + exception = conflictingPartitionColumnsError( Seq( (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))), (new Path("file:/tmp/foo/b=1"), - PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType)))))).trim === - makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1"))) + PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType)))) + ) + ), + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + parameters = Map( + "distinctPartColLists" -> + "\n\tPartition column name list #0: a\n\tPartition column name list #1: b\n", + "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/b=1" + ) + ) - assert( - listConflictingPartitionColumns( + checkError( + exception = conflictingPartitionColumnsError( Seq( (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))), (new Path("file:/tmp/foo/a=1"), - PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))))).trim === - makeExpectedMessage( - Seq("a"), - Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1"))) + PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))) + ) + ), + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + parameters = Map( + "distinctPartColLists" -> + "\n\tPartition column name list #0: a\n", + "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1/_temporary\n\tfile:/tmp/foo/a=1" + ) + ) - assert( - listConflictingPartitionColumns( + checkError( + exception = conflictingPartitionColumnsError( Seq( (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))), (new Path("file:/tmp/foo/a=1/b=foo"), PartitionValues(Seq("a", "b"), - Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType)))))).trim === - makeExpectedMessage( - Seq("a", "a, b"), - Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo"))) + Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType)))) + ) + ), + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + parameters = Map( + "distinctPartColLists" -> + "\n\tPartition column name list #0: a\n\tPartition column name list #1: a, b\n", + "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/a=1/b=foo" + ) + ) } test("Parallel partition discovery") { @@ -1145,6 +1149,44 @@ abstract class ParquetPartitionDiscoverySuite checkAnswer(res, Seq(Row(1, 2, 3, 4.0f))) } } + + test("SPARK-49163: Attempt to create table based on broken parquet partition data") { + withTempDir { dir => + val data = Seq[(String, String, String)](("a", "b", "c")) + data + .toDF("col1", "col2", "col3") + .write + .mode("overwrite") + .partitionBy("col1", "col2") + .parquet(dir.getCanonicalPath) + + // Structure of parquet table in filesystem: + // + // +- col1=a + // +- col2=b + // |- part-00000.parquet + + val partition = new File(dir, "col1=a") + val dummyData = new File(partition, "dummy") + dummyData.createNewFile() + + // Structure of parquet table in filesystem is now corrupt: + // + // +- col1=a + // |- dummy + // +- col2=b + // |- part-00000.parquet + + val exception = intercept[SparkRuntimeException] { + spark.read.parquet(dir.toString) + } + val msg = exception.getMessage + assert(exception.getErrorClass === "CONFLICTING_PARTITION_COLUMN_NAMES") + // Partitions inside the error message can be presented in any order + assert("Partition column name list #[0-1]: col1".r.findFirstIn(msg).isDefined) + assert("Partition column name list #[0-1]: col1, col2".r.findFirstIn(msg).isDefined) + } + } } class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {