Skip to content

Commit

Permalink
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
Browse files Browse the repository at this point in the history
… TABLE SQL statement

## What changes were proposed in this pull request?
Currently, our CREATE TABLE syntax require the EXACT order of clauses. It is pretty hard to remember the exact order. Thus, this PR is to make optional clauses order insensitive for `CREATE TABLE` SQL statement.

```
CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1 col_type1 [COMMENT col_comment1], ...)]
    USING datasource
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [OPTIONS (key1=val1, key2=val2, ...)]
    [PARTITIONED BY (col_name1, col_name2, ...)]
    [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
    [LOCATION path]
    [COMMENT table_comment]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

The same idea is also applicable to Create Hive Table.
```
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
    [(col_name1[:] col_type1 [COMMENT col_comment1], ...)]
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
    [AS select_statement]
```

The proposal is to make the following clauses order insensitive.
```
    [COMMENT table_comment]
    [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)]
    [ROW FORMAT row_format]
    [STORED AS file_format]
    [LOCATION path]
    [TBLPROPERTIES (key1=val1, key2=val2, ...)]
```

## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20133 from gatorsmile/createDataSourceTableDDL.
  • Loading branch information
gatorsmile committed Jan 3, 2018
1 parent 247a089 commit 1a87a16
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,22 @@ statement
| ALTER DATABASE identifier SET DBPROPERTIES tablePropertyList #setDatabaseProperties
| DROP DATABASE (IF EXISTS)? identifier (RESTRICT | CASCADE)? #dropDatabase
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS options=tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? locationSpec?
(COMMENT comment=STRING)?
(TBLPROPERTIES tableProps=tablePropertyList)?
((OPTIONS options=tablePropertyList) |
(PARTITIONED BY partitionColumnNames=identifierList) |
bucketSpec |
locationSpec |
(COMMENT comment=STRING) |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createTable
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT comment=STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
bucketSpec? skewSpec?
rowFormat? createFileFormat? locationSpec?
(TBLPROPERTIES tablePropertyList)?
((COMMENT comment=STRING) |
(PARTITIONED BY '(' partitionColumns=colTypeList ')') |
bucketSpec |
skewSpec |
rowFormat |
createFileFormat |
locationSpec |
(TBLPROPERTIES tableProps=tablePropertyList))*
(AS? query)? #createHiveTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier locationSpec? #createTableLike
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.catalyst.parser

import java.util

import scala.collection.mutable.StringBuilder

import org.antlr.v4.runtime.{ParserRuleContext, Token}
Expand All @@ -39,6 +41,13 @@ object ParserUtils {
throw new ParseException(s"Operation not allowed: $message", ctx)
}

def checkDuplicateClauses[T](
nodes: util.List[T], clauseName: String, ctx: ParserRuleContext): Unit = {
if (nodes.size() > 1) {
throw new ParseException(s"Found duplicate clauses: $clauseName", ctx)
}
}

/** Check if duplicate keys exist in a set of key-value pairs. */
def checkDuplicateKeys[T](keyPairs: Seq[(String, T)], ctx: ParserRuleContext): Unit = {
keyPairs.groupBy(_._1).filter(_._2.size > 1).foreach { case (key, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,23 +383,34 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* {{{
* CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name
* USING table_provider
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* create_table_clauses
* [[AS] select_statement];
*
* create_table_clauses (order insensitive):
* [OPTIONS table_property_list]
* [PARTITIONED BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
* ]
* [LOCATION path]
* [COMMENT table_comment]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
if (external) {
operationNotAllowed("CREATE EXTERNAL TABLE ... USING", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.OPTIONS, "OPTIONS", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val schema = Option(ctx.colTypeList()).map(createSchema)
Expand All @@ -408,9 +419,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)

val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
val storage = DataSource.buildStorageFormatFromOptions(options)

if (location.isDefined && storage.locationUri.isDefined) {
Expand Down Expand Up @@ -1087,13 +1098,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1[:] data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* create_table_clauses
* [AS select_statement];
*
* create_table_clauses (order insensitive):
* [COMMENT table_comment]
* [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
* [ROW FORMAT row_format]
* [STORED AS file_format]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* }}}
*/
override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = withOrigin(ctx) {
Expand All @@ -1104,28 +1118,36 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
"CREATE TEMPORARY TABLE is not supported yet. " +
"Please use CREATE TEMPORARY VIEW as an alternative.", ctx)
}
if (ctx.skewSpec != null) {
if (ctx.skewSpec.size > 0) {
operationNotAllowed("CREATE TABLE ... SKEWED BY", ctx)
}

checkDuplicateClauses(ctx.TBLPROPERTIES, "TBLPROPERTIES", ctx)
checkDuplicateClauses(ctx.PARTITIONED, "PARTITIONED BY", ctx)
checkDuplicateClauses(ctx.COMMENT, "COMMENT", ctx)
checkDuplicateClauses(ctx.bucketSpec(), "CLUSTERED BY", ctx)
checkDuplicateClauses(ctx.createFileFormat, "STORED AS/BY", ctx)
checkDuplicateClauses(ctx.rowFormat, "ROW FORMAT", ctx)
checkDuplicateClauses(ctx.locationSpec, "LOCATION", ctx)

val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
val partitionCols = Option(ctx.partitionColumns).map(visitColTypeList).getOrElse(Nil)
val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
val properties = Option(ctx.tableProps).map(visitPropertyKeyValues).getOrElse(Map.empty)
val selectQuery = Option(ctx.query).map(plan)
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
val bucketSpec = ctx.bucketSpec().asScala.headOption.map(visitBucketSpec)

// Note: Hive requires partition columns to be distinct from the schema, so we need
// to include the partition columns here explicitly
val schema = StructType(dataCols ++ partitionCols)

// Storage format
val defaultStorage = HiveSerDe.getDefaultStorage(conf)
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
validateRowFormatFileFormat(ctx.rowFormat.asScala, ctx.createFileFormat.asScala, ctx)
val fileStorage = ctx.createFileFormat.asScala.headOption.map(visitCreateFileFormat)
.getOrElse(CatalogStorageFormat.empty)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
val rowStorage = ctx.rowFormat.asScala.headOption.map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = ctx.locationSpec.asScala.headOption.map(visitLocationSpec)
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
Expand Down Expand Up @@ -1180,7 +1202,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
ctx)
}

val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
val hasStorageProperties = (ctx.createFileFormat.size != 0) || (ctx.rowFormat.size != 0)
if (conf.convertCTAS && !hasStorageProperties) {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
Expand Down Expand Up @@ -1366,6 +1388,15 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
}

private def validateRowFormatFileFormat(
rowFormatCtx: Seq[RowFormatContext],
createFileFormatCtx: Seq[CreateFileFormatContext],
parentCtx: ParserRuleContext): Unit = {
if (rowFormatCtx.size == 1 && createFileFormatCtx.size == 1) {
validateRowFormatFileFormat(rowFormatCtx.head, createFileFormatCtx.head, parentCtx)
}
}

/**
* Create or replace a view. This creates a [[CreateViewCommand]] command.
*
Expand Down
Loading

0 comments on commit 1a87a16

Please sign in to comment.