From e5f7ced64ef23c82103bad9e86deb993e9eaed97 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Tue, 4 Oct 2016 15:30:03 -0700 Subject: [PATCH] [SPARK-17612][SQL] Support `DESCRIBE table PARTITION` SQL syntax --- .../sql/catalyst/catalog/interface.scala | 12 ++- .../spark/sql/execution/SparkSqlParser.scala | 15 ++- .../spark/sql/execution/command/tables.scala | 91 ++++++++++++++----- .../resources/sql-tests/inputs/describe.sql | 27 ++++++ .../sql-tests/results/describe.sql.out | 90 ++++++++++++++++++ .../sql/hive/execution/SQLQuerySuite.scala | 78 +++++++++++++++- 6 files changed, 287 insertions(+), 26 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/describe.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/describe.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index e7430b030901a..83428924b1600 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -108,7 +108,17 @@ case class CatalogColumn( case class CatalogTablePartition( spec: CatalogTypes.TablePartitionSpec, storage: CatalogStorageFormat, - parameters: Map[String, String] = Map.empty) + parameters: Map[String, String] = Map.empty) { + + override def toString: String = { + val output = + Seq( + s"Partition Values: [${spec.values.mkString(", ")}]", + s"$storage", + s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") + output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")") + } +} /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3072a6d79eac7..f092360a98bae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -278,13 +278,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * Create a [[DescribeTableCommand]] logical plan. */ override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { - // Describe partition and column are not supported yet. Return null and let the parser decide + // Describe column are not supported yet. Return null and let the parser decide // what to do with this (create an exception or pass it on to a different system). - if (ctx.describeColName != null || ctx.partitionSpec != null) { + if (ctx.describeColName != null) { null } else { + val partitionSpec = if (ctx.partitionSpec != null) { + // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`. + visitPartitionSpec(ctx.partitionSpec).map { + case (key, Some(value)) => key -> value + case (key, _) => + throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx) + } + } else { + Map.empty[String, String] + } DescribeTableCommand( visitTableIdentifier(ctx.tableIdentifier), + partitionSpec, ctx.EXTENDED != null, ctx.FORMATTED != null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 995feb3b670ee..920b0ee61496c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -417,10 +417,14 @@ case class TruncateTableCommand( /** * Command that looks like * {{{ - * DESCRIBE [EXTENDED|FORMATTED] table_name; + * DESCRIBE [EXTENDED|FORMATTED] table_name partitionSpec?; * }}} */ -case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isFormatted: Boolean) +case class DescribeTableCommand( + table: TableIdentifier, + partitionSpec: TablePartitionSpec, + isExtended: Boolean, + isFormatted: Boolean) extends RunnableCommand { override val output: Seq[Attribute] = Seq( @@ -438,25 +442,25 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF val catalog = sparkSession.sessionState.catalog if (catalog.isTemporaryTable(table)) { + if (partitionSpec.nonEmpty) { + throw new AnalysisException( + s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}") + } describeSchema(catalog.lookupRelation(table).schema, result) } else { val metadata = catalog.getTableMetadata(table) + describeSchema(metadata.schema, result) - if (DDLUtils.isDatasourceTable(metadata)) { - DDLUtils.getSchemaFromTableProperties(metadata) match { - case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result) - case None => describeSchema(catalog.lookupRelation(table).schema, result) - } - } else { - describeSchema(metadata.schema, result) - } + describePartitionInfo(metadata, result) - if (isExtended) { - describeExtended(metadata, result) - } else if (isFormatted) { - describeFormatted(metadata, result) + if (partitionSpec.isEmpty) { + if (isExtended) { + describeExtendedTableInfo(metadata, result) + } else if (isFormatted) { + describeFormattedTableInfo(metadata, result) + } } else { - describePartitionInfo(metadata, result) + describeDetailedPartitionInfo(catalog, metadata, result) } } @@ -481,16 +485,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } - private def describeExtended(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - describePartitionInfo(table, buffer) - + private def describeExtendedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { append(buffer, "", "", "") append(buffer, "# Detailed Table Information", table.toString, "") } - private def describeFormatted(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { - describePartitionInfo(table, buffer) - + private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = { append(buffer, "", "", "") append(buffer, "# Detailed Table Information", "", "") append(buffer, "Database:", table.database, "") @@ -548,6 +548,53 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF } } + private def describeDetailedPartitionInfo( + catalog: SessionCatalog, + metadata: CatalogTable, + result: ArrayBuffer[Row]): Unit = { + if (metadata.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s"DESC PARTITION is not allowed on a view: ${table.identifier}") + } + if (DDLUtils.isDatasourceTable(metadata)) { + throw new AnalysisException( + s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}") + } + val partition = catalog.getPartition(table, partitionSpec) + if (isExtended) { + describeExtendedDetailedPartitionInfo(table, metadata, partition, result) + } else if (isFormatted) { + describeFormattedDetailedPartitionInfo(table, metadata, partition, result) + describeStorageInfo(metadata, result) + } + } + + private def describeExtendedDetailedPartitionInfo( + tableIdentifier: TableIdentifier, + table: CatalogTable, + partition: CatalogTablePartition, + buffer: ArrayBuffer[Row]): Unit = { + append(buffer, "", "", "") + append(buffer, "Detailed Partition Information " + partition.toString, "", "") + } + + private def describeFormattedDetailedPartitionInfo( + tableIdentifier: TableIdentifier, + table: CatalogTable, + partition: CatalogTablePartition, + buffer: ArrayBuffer[Row]): Unit = { + append(buffer, "", "", "") + append(buffer, "# Detailed Partition Information", "", "") + append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") + append(buffer, "Database:", table.database, "") + append(buffer, "Table:", tableIdentifier.table, "") + append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "") + append(buffer, "Partition Parameters:", "", "") + partition.parameters.foreach { case (key, value) => + append(buffer, s" $key", value, "") + } + } + private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = { schema.foreach { column => append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull) diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe.sql b/sql/core/src/test/resources/sql-tests/inputs/describe.sql new file mode 100644 index 0000000000000..2e916d9edc73b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/describe.sql @@ -0,0 +1,27 @@ +CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING); + +ALTER TABLE t ADD PARTITION (c='Us', d=1); + +DESC t; + +-- Ignore these because there exist timestamp results, e.g., `Create Table`. +-- DESC EXTENDED t; +-- DESC FORMATTED t; + +DESC t PARTITION (c='Us', d=1); + +-- Ignore these because there exist timestamp results, e.g., transient_lastDdlTime. +-- DESC EXTENDED t PARTITION (c='Us', d=1); +-- DESC FORMATTED t PARTITION (c='Us', d=1); + +-- NoSuchPartitionException: Partition not found in table +DESC t PARTITION (c='Us', d=2); + +-- AnalysisException: Partition spec is invalid +DESC t PARTITION (c='Us'); + +-- ParseException: PARTITION specification is incomplete +DESC t PARTITION (c='Us', d); + +-- DROP TEST TABLE +DROP TABLE t; \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out new file mode 100644 index 0000000000000..37bf303f1bfe4 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out @@ -0,0 +1,90 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 8 + + +-- !query 0 +CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +ALTER TABLE t ADD PARTITION (c='Us', d=1) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +DESC t +-- !query 2 schema +struct +-- !query 2 output +# Partition Information +# col_name data_type comment +a string +b int +c string +c string +d string +d string + + +-- !query 3 +DESC t PARTITION (c='Us', d=1) +-- !query 3 schema +struct +-- !query 3 output +# Partition Information +# col_name data_type comment +a string +b int +c string +c string +d string +d string + + +-- !query 4 +DESC t PARTITION (c='Us', d=2) +-- !query 4 schema +struct<> +-- !query 4 output +org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException +Partition not found in table 't' database 'default': +c -> Us +d -> 2; + + +-- !query 5 +DESC t PARTITION (c='Us') +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +Partition spec is invalid. The spec (c) must match the partition spec (c, d) defined in table '`default`.`t`'; + + +-- !query 6 +DESC t PARTITION (c='Us', d) +-- !query 6 schema +struct<> +-- !query 6 output +org.apache.spark.sql.catalyst.parser.ParseException + +PARTITION specification is incomplete: `d`(line 1, pos 0) + +== SQL == +DESC t PARTITION (c='Us', d) +^^^ + + +-- !query 7 +DROP TABLE t +-- !query 7 schema +struct<> +-- !query 7 output + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 51d49469b3f46..b421fae6dc57b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, + NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils @@ -342,6 +343,81 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("describe partition") { + withTable("partitioned_table") { + sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)") + sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)") + + checkKeywordsExist(sql("DESC partitioned_table PARTITION (c='Us', d=1)"), + "# Partition Information", + "# col_name") + + checkKeywordsExist(sql("DESC EXTENDED partitioned_table PARTITION (c='Us', d=1)"), + "# Partition Information", + "# col_name", + "Detailed Partition Information CatalogPartition(", + "Partition Values: [Us, 1]", + "Storage(Location:", + "Partition Parameters") + + checkKeywordsExist(sql("DESC FORMATTED partitioned_table PARTITION (c='Us', d=1)"), + "# Partition Information", + "# col_name", + "# Detailed Partition Information", + "Partition Value:", + "Database:", + "Table:", + "Location:", + "Partition Parameters:", + "# Storage Information") + } + } + + test("describe partition - error handling") { + withTable("partitioned_table", "datasource_table") { + sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)") + sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)") + + val m = intercept[NoSuchPartitionException] { + sql("DESC partitioned_table PARTITION (c='Us', d=2)") + }.getMessage() + assert(m.contains("Partition not found in table")) + + val m2 = intercept[AnalysisException] { + sql("DESC partitioned_table PARTITION (c='Us')") + }.getMessage() + assert(m2.contains("Partition spec is invalid")) + + val m3 = intercept[ParseException] { + sql("DESC partitioned_table PARTITION (c='Us', d)") + }.getMessage() + assert(m3.contains("PARTITION specification is incomplete: `d`")) + + spark + .range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write + .partitionBy("d") + .saveAsTable("datasource_table") + val m4 = intercept[AnalysisException] { + sql("DESC datasource_table PARTITION (d=2)") + }.getMessage() + assert(m4.contains("DESC PARTITION is not allowed on a datasource table")) + + val m5 = intercept[AnalysisException] { + spark.range(10).select('id as 'a, 'id as 'b).createTempView("view1") + sql("DESC view1 PARTITION (c='Us', d=1)") + }.getMessage() + assert(m5.contains("DESC PARTITION is not allowed on a temporary view")) + + withView("permanent_view") { + val m = intercept[AnalysisException] { + sql("CREATE VIEW permanent_view AS SELECT * FROM partitioned_table") + sql("DESC permanent_view PARTITION (c='Us', d=1)") + }.getMessage() + assert(m.contains("DESC PARTITION is not allowed on a view")) + } + } + } + test("SPARK-5371: union with null and sum") { val df = Seq((1, 1)).toDF("c1", "c2") df.createOrReplaceTempView("table1")