Skip to content

Commit

Permalink
[SPARK-17612][SQL] Support DESCRIBE table PARTITION SQL syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjoon-hyun committed Oct 4, 2016
1 parent a9165bb commit e5f7ced
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,17 @@ case class CatalogColumn(
case class CatalogTablePartition(
spec: CatalogTypes.TablePartitionSpec,
storage: CatalogStorageFormat,
parameters: Map[String, String] = Map.empty)
parameters: Map[String, String] = Map.empty) {

override def toString: String = {
val output =
Seq(
s"Partition Values: [${spec.values.mkString(", ")}]",
s"$storage",
s"Partition Parameters:{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
output.filter(_.nonEmpty).mkString("CatalogPartition(\n\t", "\n\t", ")")
}
}


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,24 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
* Create a [[DescribeTableCommand]] logical plan.
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
// Describe partition and column are not supported yet. Return null and let the parser decide
// Describe column are not supported yet. Return null and let the parser decide
// what to do with this (create an exception or pass it on to a different system).
if (ctx.describeColName != null || ctx.partitionSpec != null) {
if (ctx.describeColName != null) {
null
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
visitPartitionSpec(ctx.partitionSpec).map {
case (key, Some(value)) => key -> value
case (key, _) =>
throw new ParseException(s"PARTITION specification is incomplete: `$key`", ctx)
}
} else {
Map.empty[String, String]
}
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
partitionSpec,
ctx.EXTENDED != null,
ctx.FORMATTED != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -417,10 +417,14 @@ case class TruncateTableCommand(
/**
* Command that looks like
* {{{
* DESCRIBE [EXTENDED|FORMATTED] table_name;
* DESCRIBE [EXTENDED|FORMATTED] table_name partitionSpec?;
* }}}
*/
case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isFormatted: Boolean)
case class DescribeTableCommand(
table: TableIdentifier,
partitionSpec: TablePartitionSpec,
isExtended: Boolean,
isFormatted: Boolean)
extends RunnableCommand {

override val output: Seq[Attribute] = Seq(
Expand All @@ -438,25 +442,25 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
val catalog = sparkSession.sessionState.catalog

if (catalog.isTemporaryTable(table)) {
if (partitionSpec.nonEmpty) {
throw new AnalysisException(
s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}")
}
describeSchema(catalog.lookupRelation(table).schema, result)
} else {
val metadata = catalog.getTableMetadata(table)
describeSchema(metadata.schema, result)

if (DDLUtils.isDatasourceTable(metadata)) {
DDLUtils.getSchemaFromTableProperties(metadata) match {
case Some(userSpecifiedSchema) => describeSchema(userSpecifiedSchema, result)
case None => describeSchema(catalog.lookupRelation(table).schema, result)
}
} else {
describeSchema(metadata.schema, result)
}
describePartitionInfo(metadata, result)

if (isExtended) {
describeExtended(metadata, result)
} else if (isFormatted) {
describeFormatted(metadata, result)
if (partitionSpec.isEmpty) {
if (isExtended) {
describeExtendedTableInfo(metadata, result)
} else if (isFormatted) {
describeFormattedTableInfo(metadata, result)
}
} else {
describePartitionInfo(metadata, result)
describeDetailedPartitionInfo(catalog, metadata, result)
}
}

Expand All @@ -481,16 +485,12 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeExtended(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describePartitionInfo(table, buffer)

private def describeExtendedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", table.toString, "")
}

private def describeFormatted(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
describePartitionInfo(table, buffer)

private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Detailed Table Information", "", "")
append(buffer, "Database:", table.database, "")
Expand Down Expand Up @@ -548,6 +548,53 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean, isF
}
}

private def describeDetailedPartitionInfo(
catalog: SessionCatalog,
metadata: CatalogTable,
result: ArrayBuffer[Row]): Unit = {
if (metadata.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
s"DESC PARTITION is not allowed on a view: ${table.identifier}")
}
if (DDLUtils.isDatasourceTable(metadata)) {
throw new AnalysisException(
s"DESC PARTITION is not allowed on a datasource table: ${table.identifier}")
}
val partition = catalog.getPartition(table, partitionSpec)
if (isExtended) {
describeExtendedDetailedPartitionInfo(table, metadata, partition, result)
} else if (isFormatted) {
describeFormattedDetailedPartitionInfo(table, metadata, partition, result)
describeStorageInfo(metadata, result)
}
}

private def describeExtendedDetailedPartitionInfo(
tableIdentifier: TableIdentifier,
table: CatalogTable,
partition: CatalogTablePartition,
buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "Detailed Partition Information " + partition.toString, "", "")
}

private def describeFormattedDetailedPartitionInfo(
tableIdentifier: TableIdentifier,
table: CatalogTable,
partition: CatalogTablePartition,
buffer: ArrayBuffer[Row]): Unit = {
append(buffer, "", "", "")
append(buffer, "# Detailed Partition Information", "", "")
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
append(buffer, "Database:", table.database, "")
append(buffer, "Table:", tableIdentifier.table, "")
append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "")
append(buffer, "Partition Parameters:", "", "")
partition.parameters.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
}
}

private def describeSchema(schema: Seq[CatalogColumn], buffer: ArrayBuffer[Row]): Unit = {
schema.foreach { column =>
append(buffer, column.name, column.dataType.toLowerCase, column.comment.orNull)
Expand Down
27 changes: 27 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/describe.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING);

ALTER TABLE t ADD PARTITION (c='Us', d=1);

DESC t;

-- Ignore these because there exist timestamp results, e.g., `Create Table`.
-- DESC EXTENDED t;
-- DESC FORMATTED t;

DESC t PARTITION (c='Us', d=1);

-- Ignore these because there exist timestamp results, e.g., transient_lastDdlTime.
-- DESC EXTENDED t PARTITION (c='Us', d=1);
-- DESC FORMATTED t PARTITION (c='Us', d=1);

-- NoSuchPartitionException: Partition not found in table
DESC t PARTITION (c='Us', d=2);

-- AnalysisException: Partition spec is invalid
DESC t PARTITION (c='Us');

-- ParseException: PARTITION specification is incomplete
DESC t PARTITION (c='Us', d);

-- DROP TEST TABLE
DROP TABLE t;
90 changes: 90 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/describe.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 8


-- !query 0
CREATE TABLE t (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
ALTER TABLE t ADD PARTITION (c='Us', d=1)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
DESC t
-- !query 2 schema
struct<col_name:string,data_type:string,comment:string>
-- !query 2 output
# Partition Information
# col_name data_type comment
a string
b int
c string
c string
d string
d string


-- !query 3
DESC t PARTITION (c='Us', d=1)
-- !query 3 schema
struct<col_name:string,data_type:string,comment:string>
-- !query 3 output
# Partition Information
# col_name data_type comment
a string
b int
c string
c string
d string
d string


-- !query 4
DESC t PARTITION (c='Us', d=2)
-- !query 4 schema
struct<>
-- !query 4 output
org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
Partition not found in table 't' database 'default':
c -> Us
d -> 2;


-- !query 5
DESC t PARTITION (c='Us')
-- !query 5 schema
struct<>
-- !query 5 output
org.apache.spark.sql.AnalysisException
Partition spec is invalid. The spec (c) must match the partition spec (c, d) defined in table '`default`.`t`';


-- !query 6
DESC t PARTITION (c='Us', d)
-- !query 6 schema
struct<>
-- !query 6 output
org.apache.spark.sql.catalyst.parser.ParseException

PARTITION specification is incomplete: `d`(line 1, pos 0)

== SQL ==
DESC t PARTITION (c='Us', d)
^^^


-- !query 7
DROP TABLE t
-- !query 7 schema
struct<>
-- !query 7 output

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry,
NoSuchPartitionException}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils
Expand Down Expand Up @@ -342,6 +343,81 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("describe partition") {
withTable("partitioned_table") {
sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")

checkKeywordsExist(sql("DESC partitioned_table PARTITION (c='Us', d=1)"),
"# Partition Information",
"# col_name")

checkKeywordsExist(sql("DESC EXTENDED partitioned_table PARTITION (c='Us', d=1)"),
"# Partition Information",
"# col_name",
"Detailed Partition Information CatalogPartition(",
"Partition Values: [Us, 1]",
"Storage(Location:",
"Partition Parameters")

checkKeywordsExist(sql("DESC FORMATTED partitioned_table PARTITION (c='Us', d=1)"),
"# Partition Information",
"# col_name",
"# Detailed Partition Information",
"Partition Value:",
"Database:",
"Table:",
"Location:",
"Partition Parameters:",
"# Storage Information")
}
}

test("describe partition - error handling") {
withTable("partitioned_table", "datasource_table") {
sql("CREATE TABLE partitioned_table (a STRING, b INT) PARTITIONED BY (c STRING, d STRING)")
sql("ALTER TABLE partitioned_table ADD PARTITION (c='Us', d=1)")

val m = intercept[NoSuchPartitionException] {
sql("DESC partitioned_table PARTITION (c='Us', d=2)")
}.getMessage()
assert(m.contains("Partition not found in table"))

val m2 = intercept[AnalysisException] {
sql("DESC partitioned_table PARTITION (c='Us')")
}.getMessage()
assert(m2.contains("Partition spec is invalid"))

val m3 = intercept[ParseException] {
sql("DESC partitioned_table PARTITION (c='Us', d)")
}.getMessage()
assert(m3.contains("PARTITION specification is incomplete: `d`"))

spark
.range(1).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd).write
.partitionBy("d")
.saveAsTable("datasource_table")
val m4 = intercept[AnalysisException] {
sql("DESC datasource_table PARTITION (d=2)")
}.getMessage()
assert(m4.contains("DESC PARTITION is not allowed on a datasource table"))

val m5 = intercept[AnalysisException] {
spark.range(10).select('id as 'a, 'id as 'b).createTempView("view1")
sql("DESC view1 PARTITION (c='Us', d=1)")
}.getMessage()
assert(m5.contains("DESC PARTITION is not allowed on a temporary view"))

withView("permanent_view") {
val m = intercept[AnalysisException] {
sql("CREATE VIEW permanent_view AS SELECT * FROM partitioned_table")
sql("DESC permanent_view PARTITION (c='Us', d=1)")
}.getMessage()
assert(m.contains("DESC PARTITION is not allowed on a view"))
}
}
}

test("SPARK-5371: union with null and sum") {
val df = Seq((1, 1)).toDF("c1", "c2")
df.createOrReplaceTempView("table1")
Expand Down

0 comments on commit e5f7ced

Please sign in to comment.