From a935eb81f0e6cfe95d7b5ffd8edf9308dacd7f4d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 3 Jun 2015 17:50:39 +0800 Subject: [PATCH 1/4] Improves error message when conflicting partition columns are found --- .../org/apache/spark/sql/sources/PartitioningUtils.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index c6f535dde7676..2972c6142ce05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -189,8 +189,13 @@ private[sql] object PartitioningUtils { Seq.empty } else { assert(distinctPartitionsColNames.size == 1, { - val list = distinctPartitionsColNames.mkString("\t", "\n\t", "") - s"Conflicting partition column names detected:\n$list" + val list = distinctPartitionsColNames.map(_.mkString(", ")).zipWithIndex.map { + case (names, index) => + s"\tPartition column name list #$index: $names" + } + + s"Conflicting partition column names detected:\n${list.mkString("\n")}\n" + + "For partitioned table directories, data files should only live in leaf directories." }) // Resolves possible type conflicts for each column From 6b74dd8b2b140612afd758ac151f9061b042ba51 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 7 Jun 2015 16:51:36 +0800 Subject: [PATCH 2/4] Also lists suspicious non-leaf partition directories --- .../spark/sql/sources/PartitioningUtils.scala | 41 +++++++++++++------ 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index 2972c6142ce05..3c1741ff29547 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -84,7 +84,7 @@ private[sql] object PartitioningUtils { } else { // This dataset is partitioned. We need to check whether all partitions have the same // partition columns and resolve potential type conflicts. - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2)) + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) // Creates the StructType which represents the partition columns. val fields = { @@ -181,24 +181,41 @@ private[sql] object PartitioningUtils { * StringType * }}} */ - private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = { - // Column names of all partitions must match - val distinctPartitionsColNames = values.map(_.columnNames).distinct - - if (distinctPartitionsColNames.isEmpty) { + private[sql] def resolvePartitions( + pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { + if (pathsWithPartitionValues.isEmpty) { Seq.empty } else { - assert(distinctPartitionsColNames.size == 1, { - val list = distinctPartitionsColNames.map(_.mkString(", ")).zipWithIndex.map { + val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct + + def listConflictingPartitionColumns: String = { + def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = + seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) + + val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map { + case (path, partValues) => partValues.columnNames -> path + }) + + val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { case (names, index) => - s"\tPartition column name list #$index: $names" + s"Partition column name list #$index: $names" } - s"Conflicting partition column names detected:\n${list.mkString("\n")}\n" + - "For partitioned table directories, data files should only live in leaf directories." - }) + // Lists out those non-leaf partition directories that also contain files + val suspiciousPaths = + distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths) + + s"Conflicting partition column names detected:\n" + + distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + + "For partitioned table directories, data files should only live in leaf directories. " + + "Please check the following directories for unexpected files:\n" + + suspiciousPaths.mkString("\n\t", "\n\t", "\n") + } + + assert(distinctPartColNames.size == 1, listConflictingPartitionColumns) // Resolves possible type conflicts for each column + val values = pathsWithPartitionValues.map(_._2) val columnCount = values.head.columnNames.size val resolvedValues = (0 until columnCount).map { i => resolveTypeConflicts(values.map(_.literals(i))) From a149250eeea5e3f4eddba53205b86ea92cd530c7 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 23 Jun 2015 23:10:29 -0700 Subject: [PATCH 3/4] Adds test case for the error message --- .../spark/sql/sources/PartitioningUtils.scala | 57 ++++++++++--------- .../ParquetPartitionDiscoverySuite.scala | 45 +++++++++++++++ 2 files changed, 76 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index 3c1741ff29547..8b2a45d8e970a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -187,32 +187,9 @@ private[sql] object PartitioningUtils { Seq.empty } else { val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct - - def listConflictingPartitionColumns: String = { - def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) - - val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map { - case (path, partValues) => partValues.columnNames -> path - }) - - val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { - case (names, index) => - s"Partition column name list #$index: $names" - } - - // Lists out those non-leaf partition directories that also contain files - val suspiciousPaths = - distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths) - - s"Conflicting partition column names detected:\n" + - distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + - "For partitioned table directories, data files should only live in leaf directories. " + - "Please check the following directories for unexpected files:\n" + - suspiciousPaths.mkString("\n\t", "\n\t", "\n") - } - - assert(distinctPartColNames.size == 1, listConflictingPartitionColumns) + assert( + distinctPartColNames.size == 1, + listConflictingPartitionColumns(pathsWithPartitionValues)) // Resolves possible type conflicts for each column val values = pathsWithPartitionValues.map(_._2) @@ -228,6 +205,34 @@ private[sql] object PartitioningUtils { } } + private[sql] def listConflictingPartitionColumns( + pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { + val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct + + def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = + seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) + + val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { + case (path, partValues) => partValues.columnNames -> path + }) + + val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { + case (names, index) => + s"Partition column name list #$index: $names" + } + + // Lists out those non-leaf partition directories that also contain files + val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths) + + s"Conflicting partition column names detected:\n" + + distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + + "For partitioned table directories, data files should only live in leaf directories.\n" + + "And directories at the same level should have the same partition column name.\n" + + "Please check the following directories for unexpected files or " + + "inconsistent partition column names:\n" + + suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + } + /** * Converts a string to a [[Literal]] with automatic type inference. Currently only supports * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 01df189d1f3be..971765bab9508 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -538,4 +538,49 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { checkAnswer(sqlContext.read.format("parquet").load(dir.getCanonicalPath), df) } } + + test("listConflictingPartitionColumns") { + def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]) = { + val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => + s"\tPartition column name list #$index: $list" + }.mkString("\n", "\n", "\n") + + // scalastyle:off + s"""Conflicting partition column names detected: + |$conflictingColNameLists + |For partitioned table directories, data files should only live in leaf directories. + |And directories at the same level should have the same partition column name. + |Please check the following directories for unexpected files or inconsistent partition column names: + |${paths.map("\t" + _).mkString("\n", "\n", "")} + """.stripMargin.trim + // scalastyle:on + } + + assert( + listConflictingPartitionColumns( + Seq( + (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))), + (new Path("file:/tmp/foo/b=1"), PartitionValues(Seq("b"), Seq(Literal(1)))))).trim === + makeExpectedMessage(Seq("a", "b"), Seq("file:/tmp/foo/a=1", "file:/tmp/foo/b=1"))) + + assert( + listConflictingPartitionColumns( + Seq( + (new Path("file:/tmp/foo/a=1/_temporary"), PartitionValues(Seq("a"), Seq(Literal(1)))), + (new Path("file:/tmp/foo/a=1"), PartitionValues(Seq("a"), Seq(Literal(1)))))).trim === + makeExpectedMessage( + Seq("a"), + Seq("file:/tmp/foo/a=1/_temporary", "file:/tmp/foo/a=1"))) + + assert( + listConflictingPartitionColumns( + Seq( + (new Path("file:/tmp/foo/a=1"), + PartitionValues(Seq("a"), Seq(Literal(1)))), + (new Path("file:/tmp/foo/a=1/b=foo"), + PartitionValues(Seq("a", "b"), Seq(Literal(1), Literal("foo")))))).trim === + makeExpectedMessage( + Seq("a", "a, b"), + Seq("file:/tmp/foo/a=1", "file:/tmp/foo/a=1/b=foo"))) + } } From 7d05f2cb1d6c84e71f7e304e580d0a1e842121eb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 23 Jun 2015 23:58:13 -0700 Subject: [PATCH 4/4] Fixes Scala style issue --- .../spark/sql/parquet/ParquetPartitionDiscoverySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 971765bab9508..d0ebb11b063f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -540,7 +540,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest { } test("listConflictingPartitionColumns") { - def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]) = { + def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = { val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => s"\tPartition column name list #$index: $list" }.mkString("\n", "\n", "\n")