From fd3069ab113c12a6dfa338abfd8c99acd707dfbd Mon Sep 17 00:00:00 2001 From: Nikola Mandic Date: Fri, 9 Aug 2024 21:50:48 +0800 Subject: [PATCH] [SPARK-49163][SQL] Attempt to create table based on broken parquet partition data results should return user-facing error ### What changes were proposed in this pull request? Create an example parquet table with partitions and insert data in Spark: ``` create table t(col1 string, col2 string, col3 string) using parquet location 'some/path/parquet-test' partitioned by (col1, col2); insert into t (col1, col2, col3) values ('a', 'b', 'c'); ``` Go into the `parquet-test` path in the filesystem and try to copy parquet data file from path `col1=a/col2=b` directory into `col1=a`. After that, try to create new table based on parquet data in Spark: ``` create table broken_table using parquet location 'some/path/parquet-test'; ``` This query errors with internal error. Stack trace excerpts: ``` org.apache.spark.SparkException: [INTERNAL_ERROR] Eagerly executed command failed. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace. SQLSTATE: XX000 ... Caused by: java.lang.AssertionError: assertion failed: Conflicting partition column names detected: Partition column name list #0: col1 Partition column name list #1: col1, col2For partitioned table directories, data files should only live in leaf directories. And directories at the same level should have the same partition column name. Please check the following directories for unexpected files or inconsistent partition column names: file:some/path/parquet-test/col1=a file:some/path/parquet-test/col1=a/col2=b at scala.Predef$.assert(Predef.scala:279) at org.apache.spark.sql.execution.datasources.PartitioningUtils$.resolvePartitions(PartitioningUtils.scala:391) ... ``` Fix this by changing internal error to user-facing error. ### Why are the changes needed? Replace internal error with user-facing one for valid sequence of Spark SQL operations. ### Does this PR introduce _any_ user-facing change? Yes, it presents the user with regular error instead of internal error. ### How was this patch tested? Added checks to `ParquetPartitionDiscoverySuite` which simulate the described scenario by manually breaking parquet table in the filesystem. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47668 from nikolamand-db/SPARK-49163. Authored-by: Nikola Mandic Signed-off-by: Wenchen Fan --- .../resources/error/error-conditions.json | 11 ++ .../sql/errors/QueryExecutionErrors.scala | 12 ++ .../datasources/PartitioningUtils.scala | 20 ++-- .../datasources/FileIndexSuite.scala | 2 +- .../ParquetPartitionDiscoverySuite.scala | 110 ++++++++++++------ 5 files changed, 108 insertions(+), 47 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 4766c77909158..3512fe34e92a7 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -625,6 +625,17 @@ ], "sqlState" : "40000" }, + "CONFLICTING_PARTITION_COLUMN_NAMES" : { + "message" : [ + "Conflicting partition column names detected:", + "", + "For partitioned table directories, data files should only live in leaf directories.", + "And directories at the same level should have the same partition column name.", + "Please check the following directories for unexpected files or inconsistent partition column names:", + "" + ], + "sqlState" : "KD009" + }, "CONNECT" : { "message" : [ "Generic Spark Connect error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 9bfb81ad821be..eb25387af5a72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2837,4 +2837,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE "parameter" -> toSQLId("unit"), "invalidValue" -> s"'$invalidValue'")) } + + def conflictingPartitionColumnNamesError( + distinctPartColLists: Seq[String], + suspiciousPaths: Seq[Path]): SparkRuntimeException = { + new SparkRuntimeException( + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + messageParameters = Map( + "distinctPartColLists" -> distinctPartColLists.mkString("\n\t", "\n\t", "\n"), + "suspiciousPaths" -> suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + ) + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 3b2d601b81fb5..676a2ab64d0a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -29,6 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path +import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -386,9 +387,9 @@ object PartitioningUtils extends SQLConfHelper { } else { pathsWithPartitionValues.map(_._2.columnNames.map(_.toLowerCase())) } - assert( - partColNames.distinct.size == 1, - listConflictingPartitionColumns(pathsWithPartitionValues)) + if (partColNames.distinct.size != 1) { + throw conflictingPartitionColumnsError(pathsWithPartitionValues) + } // Resolves possible type conflicts for each column val values = pathsWithPartitionValues.map(_._2) @@ -404,8 +405,8 @@ object PartitioningUtils extends SQLConfHelper { } } - private[datasources] def listConflictingPartitionColumns( - pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { + private[datasources] def conflictingPartitionColumnsError( + pathWithPartitionValues: Seq[(Path, PartitionValues)]): SparkRuntimeException = { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = @@ -423,13 +424,8 @@ object PartitioningUtils extends SQLConfHelper { // Lists out those non-leaf partition directories that also contain files val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths) - s"Conflicting partition column names detected:\n" + - distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + - "For partitioned table directories, data files should only live in leaf directories.\n" + - "And directories at the same level should have the same partition column name.\n" + - "Please check the following directories for unexpected files or " + - "inconsistent partition column names:\n" + - suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + QueryExecutionErrors.conflictingPartitionColumnNamesError( + distinctPartColLists, suspiciousPaths) } // scalastyle:off line.size.limit diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 6399eb6da049f..21623f94c8baf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -112,7 +112,7 @@ class FileIndexSuite extends SharedSparkSession { } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { - val msg = intercept[AssertionError] { + val msg = intercept[SparkRuntimeException] { val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) fileIndex.partitionSpec() }.getMessage diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index a6ad147c865d2..1484511a98b63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -27,7 +27,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetOutputFormat -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkRuntimeException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils @@ -958,54 +958,58 @@ abstract class ParquetPartitionDiscoverySuite } } - test("listConflictingPartitionColumns") { - def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = { - val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => - s"\tPartition column name list #$index: $list" - }.mkString("\n", "\n", "\n") - - // scalastyle:off - s"""Conflicting partition column names detected: - |$conflictingColNameLists - |For partitioned table directories, data files should only live in leaf directories. - |And directories at the same level should have the same partition column name. - |Please check the following directories for unexpected files or inconsistent partition column names: - |${paths.map("\t" + _).mkString("\n", "\n", "")} - """.stripMargin.trim - // scalastyle:on - } - - assert( - listConflictingPartitionColumns( + test("conflictingPartitionColumnsError") { + checkError( + exception = conflictingPartitionColumnsError( Seq( (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))), (new Path("file:/tmp/foo/b=1"), - PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType)))))).trim === - makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1"))) + PartitionValues(Seq("b"), Seq(TypedPartValue("1", IntegerType)))) + ) + ), + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + parameters = Map( + "distinctPartColLists" -> + "\n\tPartition column name list #0: a\n\tPartition column name list #1: b\n", + "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/b=1" + ) + ) - assert( - listConflictingPartitionColumns( + checkError( + exception = conflictingPartitionColumnsError( Seq( (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))), (new Path("file:/tmp/foo/a=1"), - PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))))).trim === - makeExpectedMessage( - Seq("a"), - Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1"))) + PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))) + ) + ), + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + parameters = Map( + "distinctPartColLists" -> + "\n\tPartition column name list #0: a\n", + "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1/_temporary\n\tfile:/tmp/foo/a=1" + ) + ) - assert( - listConflictingPartitionColumns( + checkError( + exception = conflictingPartitionColumnsError( Seq( (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(TypedPartValue("1", IntegerType)))), (new Path("file:/tmp/foo/a=1/b=foo"), PartitionValues(Seq("a", "b"), - Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType)))))).trim === - makeExpectedMessage( - Seq("a", "a, b"), - Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo"))) + Seq(TypedPartValue("1", IntegerType), TypedPartValue("foo", StringType)))) + ) + ), + errorClass = "CONFLICTING_PARTITION_COLUMN_NAMES", + parameters = Map( + "distinctPartColLists" -> + "\n\tPartition column name list #0: a\n\tPartition column name list #1: a, b\n", + "suspiciousPaths" -> "\n\tfile:/tmp/foo/a=1\n\tfile:/tmp/foo/a=1/b=foo" + ) + ) } test("Parallel partition discovery") { @@ -1145,6 +1149,44 @@ abstract class ParquetPartitionDiscoverySuite checkAnswer(res, Seq(Row(1, 2, 3, 4.0f))) } } + + test("SPARK-49163: Attempt to create table based on broken parquet partition data") { + withTempDir { dir => + val data = Seq[(String, String, String)](("a", "b", "c")) + data + .toDF("col1", "col2", "col3") + .write + .mode("overwrite") + .partitionBy("col1", "col2") + .parquet(dir.getCanonicalPath) + + // Structure of parquet table in filesystem: + // + // +- col1=a + // +- col2=b + // |- part-00000.parquet + + val partition = new File(dir, "col1=a") + val dummyData = new File(partition, "dummy") + dummyData.createNewFile() + + // Structure of parquet table in filesystem is now corrupt: + // + // +- col1=a + // |- dummy + // +- col2=b + // |- part-00000.parquet + + val exception = intercept[SparkRuntimeException] { + spark.read.parquet(dir.toString) + } + val msg = exception.getMessage + assert(exception.getErrorClass === "CONFLICTING_PARTITION_COLUMN_NAMES") + // Partitions inside the error message can be presented in any order + assert("Partition column name list #[0-1]: col1".r.findFirstIn(msg).isDefined) + assert("Partition column name list #[0-1]: col1, col2".r.findFirstIn(msg).isDefined) + } + } } class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {