diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala similarity index 62% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 5643c58d9f847..70df7607a713f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -22,19 +22,25 @@ import java.util.Locale import scala.reflect.{classTag, ClassTag} +import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans +import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan +import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, Project, ScriptTransformation} import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -// TODO: merge this with DDLSuite (SPARK-14441) -class DDLCommandSuite extends PlanTest { +class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) private def assertUnsupported(sql: String, containsThesePhrases: Seq[String] = Seq()): Unit = { @@ -56,6 +62,17 @@ class DDLCommandSuite extends PlanTest { } } + private def compareTransformQuery(sql: String, expected: LogicalPlan): Unit = { + val plan = parser.parsePlan(sql).asInstanceOf[ScriptTransformation].copy(ioschema = null) + comparePlans(plan, expected, checkAnalysis = false) + } + + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { + parser.parsePlan(sql).collect { + case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) + }.head + } + test("create database") { val sql = """ @@ -1046,4 +1063,553 @@ class DDLCommandSuite extends PlanTest { s"got ${other.getClass.getName}: $sql") } } + + test("Test CTAS #1") { + val s1 = + """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view + |COMMENT 'This is the staging page view table' + |STORED AS RCFILE + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src""".stripMargin + + val (desc, exists) = extractTableDesc(s1) + assert(exists) + assert(desc.identifier.database == Some("mydb")) + assert(desc.identifier.table == "page_view") + assert(desc.tableType == CatalogTableType.EXTERNAL) + assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) + assert(desc.schema.isEmpty) // will be populated later when the table is actually created + assert(desc.comment == Some("This is the staging page view table")) + // TODO will be SQLText + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.partitionColumnNames.isEmpty) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.serde == + Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) + } + + test("Test CTAS #2") { + val s2 = + """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view + |COMMENT 'This is the staging page view table' + |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' + | STORED AS + | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' + | OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' + |LOCATION '/user/external/page_view' + |TBLPROPERTIES ('p1'='v1', 'p2'='v2') + |AS SELECT * FROM src""".stripMargin + + val (desc, exists) = extractTableDesc(s2) + assert(exists) + assert(desc.identifier.database == Some("mydb")) + assert(desc.identifier.table == "page_view") + assert(desc.tableType == CatalogTableType.EXTERNAL) + assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) + assert(desc.schema.isEmpty) // will be populated later when the table is actually created + // TODO will be SQLText + assert(desc.comment == Some("This is the staging page view table")) + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.partitionColumnNames.isEmpty) + assert(desc.storage.properties == Map()) + assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) + assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) + assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) + assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) + } + + test("Test CTAS #3") { + val s3 = """CREATE TABLE page_view AS SELECT * FROM src""" + val (desc, exists) = extractTableDesc(s3) + assert(exists == false) + assert(desc.identifier.database == None) + assert(desc.identifier.table == "page_view") + assert(desc.tableType == CatalogTableType.MANAGED) + assert(desc.storage.locationUri == None) + assert(desc.schema.isEmpty) + assert(desc.viewText == None) // TODO will be SQLText + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.storage.properties == Map()) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(desc.properties == Map()) + } + + test("Test CTAS #4") { + val s4 = + """CREATE TABLE page_view + |STORED BY 'storage.handler.class.name' AS SELECT * FROM src""".stripMargin + intercept[AnalysisException] { + extractTableDesc(s4) + } + } + + test("Test CTAS #5") { + val s5 = """CREATE TABLE ctas2 + | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" + | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2") + | STORED AS RCFile + | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22") + | AS + | SELECT key, value + | FROM src + | ORDER BY key, value""".stripMargin + val (desc, exists) = extractTableDesc(s5) + assert(exists == false) + assert(desc.identifier.database == None) + assert(desc.identifier.table == "ctas2") + assert(desc.tableType == CatalogTableType.MANAGED) + assert(desc.storage.locationUri == None) + assert(desc.schema.isEmpty) + assert(desc.viewText == None) // TODO will be SQLText + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.storage.properties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) + assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) + } + + test("CTAS statement with a PARTITIONED BY clause is not allowed") { + assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" + + " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp") + } + + test("CTAS statement with schema") { + assertUnsupported(s"CREATE TABLE ctas1 (age INT, name STRING) AS SELECT * FROM src") + assertUnsupported(s"CREATE TABLE ctas1 (age INT, name STRING) AS SELECT 1, 'hello'") + } + + test("unsupported operations") { + intercept[ParseException] { + parser.parsePlan( + """ + |CREATE TEMPORARY TABLE ctas2 + |ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" + |WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2") + |STORED AS RCFile + |TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22") + |AS SELECT key, value FROM src ORDER BY key, value + """.stripMargin) + } + intercept[ParseException] { + parser.parsePlan( + """ + |CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING) + |CLUSTERED BY(user_id) INTO 256 BUCKETS + |AS SELECT key, value FROM src ORDER BY key, value + """.stripMargin) + } + intercept[ParseException] { + parser.parsePlan( + """ + |CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING) + |SKEWED BY (key) ON (1,5,6) + |AS SELECT key, value FROM src ORDER BY key, value + """.stripMargin) + } + intercept[ParseException] { + parser.parsePlan( + """ + |SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' + |RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader' + |FROM testData + """.stripMargin) + } + } + + test("Invalid interval term should throw AnalysisException") { + def assertError(sql: String, errorMessage: String): Unit = { + val e = intercept[AnalysisException] { + parser.parsePlan(sql) + } + assert(e.getMessage.contains(errorMessage)) + } + assertError("select interval '42-32' year to month", + "month 32 outside range [0, 11]") + assertError("select interval '5 49:12:15' day to second", + "hour 49 outside range [0, 23]") + assertError("select interval '.1111111111' second", + "nanosecond 1111111111 outside range") + } + + test("use native json_tuple instead of hive's UDTF in LATERAL VIEW") { + val analyzer = spark.sessionState.analyzer + val plan = analyzer.execute(parser.parsePlan( + """ + |SELECT * + |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test + |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b + """.stripMargin)) + + assert(plan.children.head.asInstanceOf[Generate].generator.isInstanceOf[JsonTuple]) + } + + test("transform query spec") { + val p = ScriptTransformation( + Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b")), + "func", Seq.empty, plans.table("e"), null) + + compareTransformQuery("select transform(a, b) using 'func' from e where f < 10", + p.copy(child = p.child.where('f < 10), output = Seq('key.string, 'value.string))) + compareTransformQuery("map a, b using 'func' as c, d from e", + p.copy(output = Seq('c.string, 'd.string))) + compareTransformQuery("reduce a, b using 'func' as (c int, d decimal(10, 0)) from e", + p.copy(output = Seq('c.int, 'd.decimal(10, 0)))) + } + + test("use backticks in output of Script Transform") { + parser.parsePlan( + """SELECT `t`.`thing1` + |FROM (SELECT TRANSFORM (`parquet_t1`.`key`, `parquet_t1`.`value`) + |USING 'cat' AS (`thing1` int, `thing2` string) FROM `default`.`parquet_t1`) AS t + """.stripMargin) + } + + test("use backticks in output of Generator") { + parser.parsePlan( + """ + |SELECT `gentab2`.`gencol2` + |FROM `default`.`src` + |LATERAL VIEW explode(array(array(1, 2, 3))) `gentab1` AS `gencol1` + |LATERAL VIEW explode(`gentab1`.`gencol1`) `gentab2` AS `gencol2` + """.stripMargin) + } + + test("use escaped backticks in output of Generator") { + parser.parsePlan( + """ + |SELECT `gen``tab2`.`gen``col2` + |FROM `default`.`src` + |LATERAL VIEW explode(array(array(1, 2, 3))) `gen``tab1` AS `gen``col1` + |LATERAL VIEW explode(`gen``tab1`.`gen``col1`) `gen``tab2` AS `gen``col2` + """.stripMargin) + } + + test("create table - basic") { + val query = "CREATE TABLE my_table (id int, name string)" + val (desc, allowExisting) = extractTableDesc(query) + assert(!allowExisting) + assert(desc.identifier.database.isEmpty) + assert(desc.identifier.table == "my_table") + assert(desc.tableType == CatalogTableType.MANAGED) + assert(desc.schema == new StructType().add("id", "int").add("name", "string")) + assert(desc.partitionColumnNames.isEmpty) + assert(desc.bucketSpec.isEmpty) + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.storage.locationUri.isEmpty) + assert(desc.storage.inputFormat == + Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(desc.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(desc.storage.properties.isEmpty) + assert(desc.properties.isEmpty) + assert(desc.comment.isEmpty) + } + + test("create table - with database name") { + val query = "CREATE TABLE dbx.my_table (id int, name string)" + val (desc, _) = extractTableDesc(query) + assert(desc.identifier.database == Some("dbx")) + assert(desc.identifier.table == "my_table") + } + + test("create table - temporary") { + val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)" + val e = intercept[ParseException] { parser.parsePlan(query) } + assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet")) + } + + test("create table - external") { + val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'" + val (desc, _) = extractTableDesc(query) + assert(desc.tableType == CatalogTableType.EXTERNAL) + assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere"))) + } + + test("create table - if not exists") { + val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)" + val (_, allowExisting) = extractTableDesc(query) + assert(allowExisting) + } + + test("create table - comment") { + val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'" + val (desc, _) = extractTableDesc(query) + assert(desc.comment == Some("its hot as hell below")) + } + + test("create table - partitioned columns") { + val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)" + val (desc, _) = extractTableDesc(query) + assert(desc.schema == new StructType() + .add("id", "int") + .add("name", "string") + .add("month", "int")) + assert(desc.partitionColumnNames == Seq("month")) + } + + test("create table - clustered by") { + val numBuckets = 10 + val bucketedColumn = "id" + val sortColumn = "id" + val baseQuery = + s""" + CREATE TABLE my_table ( + $bucketedColumn int, + name string) + CLUSTERED BY($bucketedColumn) + """ + + val query1 = s"$baseQuery INTO $numBuckets BUCKETS" + val (desc1, _) = extractTableDesc(query1) + assert(desc1.bucketSpec.isDefined) + val bucketSpec1 = desc1.bucketSpec.get + assert(bucketSpec1.numBuckets == numBuckets) + assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn)) + assert(bucketSpec1.sortColumnNames.isEmpty) + + val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS" + val (desc2, _) = extractTableDesc(query2) + assert(desc2.bucketSpec.isDefined) + val bucketSpec2 = desc2.bucketSpec.get + assert(bucketSpec2.numBuckets == numBuckets) + assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn)) + assert(bucketSpec2.sortColumnNames.head.equals(sortColumn)) + } + + test("create table - skewed by") { + val baseQuery = "CREATE TABLE my_table (id int, name string) SKEWED BY" + val query1 = s"$baseQuery(id) ON (1, 10, 100)" + val query2 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z'))" + val query3 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z')) STORED AS DIRECTORIES" + val e1 = intercept[ParseException] { parser.parsePlan(query1) } + val e2 = intercept[ParseException] { parser.parsePlan(query2) } + val e3 = intercept[ParseException] { parser.parsePlan(query3) } + assert(e1.getMessage.contains("Operation not allowed")) + assert(e2.getMessage.contains("Operation not allowed")) + assert(e3.getMessage.contains("Operation not allowed")) + } + + test("create table - row format") { + val baseQuery = "CREATE TABLE my_table (id int, name string) ROW FORMAT" + val query1 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff'" + val query2 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')" + val query3 = + s""" + |$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y' + |COLLECTION ITEMS TERMINATED BY 'a' + |MAP KEYS TERMINATED BY 'b' + |LINES TERMINATED BY '\n' + |NULL DEFINED AS 'c' + """.stripMargin + val (desc1, _) = extractTableDesc(query1) + val (desc2, _) = extractTableDesc(query2) + val (desc3, _) = extractTableDesc(query3) + assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff")) + assert(desc1.storage.properties.isEmpty) + assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff")) + assert(desc2.storage.properties == Map("k1" -> "v1")) + assert(desc3.storage.properties == Map( + "field.delim" -> "x", + "escape.delim" -> "y", + "serialization.format" -> "x", + "line.delim" -> "\n", + "colelction.delim" -> "a", // yes, it's a typo from Hive :) + "mapkey.delim" -> "b")) + } + + test("create table - file format") { + val baseQuery = "CREATE TABLE my_table (id int, name string) STORED AS" + val query1 = s"$baseQuery INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'" + val query2 = s"$baseQuery ORC" + val (desc1, _) = extractTableDesc(query1) + val (desc2, _) = extractTableDesc(query2) + assert(desc1.storage.inputFormat == Some("winput")) + assert(desc1.storage.outputFormat == Some("wowput")) + assert(desc1.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + } + + test("create table - storage handler") { + val baseQuery = "CREATE TABLE my_table (id int, name string) STORED BY" + val query1 = s"$baseQuery 'org.papachi.StorageHandler'" + val query2 = s"$baseQuery 'org.mamachi.StorageHandler' WITH SERDEPROPERTIES ('k1'='v1')" + val e1 = intercept[ParseException] { parser.parsePlan(query1) } + val e2 = intercept[ParseException] { parser.parsePlan(query2) } + assert(e1.getMessage.contains("Operation not allowed")) + assert(e2.getMessage.contains("Operation not allowed")) + } + + test("create table - properties") { + val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" + val (desc, _) = extractTableDesc(query) + assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) + } + + test("create table - everything!") { + val query = + """ + |CREATE EXTERNAL TABLE IF NOT EXISTS dbx.my_table (id int, name string) + |COMMENT 'no comment' + |PARTITIONED BY (month int) + |ROW FORMAT SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1') + |STORED AS INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput' + |LOCATION '/path/to/mercury' + |TBLPROPERTIES ('k1'='v1', 'k2'='v2') + """.stripMargin + val (desc, allowExisting) = extractTableDesc(query) + assert(allowExisting) + assert(desc.identifier.database == Some("dbx")) + assert(desc.identifier.table == "my_table") + assert(desc.tableType == CatalogTableType.EXTERNAL) + assert(desc.schema == new StructType() + .add("id", "int") + .add("name", "string") + .add("month", "int")) + assert(desc.partitionColumnNames == Seq("month")) + assert(desc.bucketSpec.isEmpty) + assert(desc.viewText.isEmpty) + assert(desc.viewDefaultDatabase.isEmpty) + assert(desc.viewQueryColumnNames.isEmpty) + assert(desc.storage.locationUri == Some(new URI("/path/to/mercury"))) + assert(desc.storage.inputFormat == Some("winput")) + assert(desc.storage.outputFormat == Some("wowput")) + assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) + assert(desc.storage.properties == Map("k1" -> "v1")) + assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) + assert(desc.comment == Some("no comment")) + } + + test("create view -- basic") { + val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1" + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(!command.allowExisting) + assert(command.name.database.isEmpty) + assert(command.name.table == "view1") + assert(command.originalText == Some("SELECT * FROM tab1")) + assert(command.userSpecifiedColumns.isEmpty) + } + + test("create view - full") { + val v1 = + """ + |CREATE OR REPLACE VIEW view1 + |(col1, col3 COMMENT 'hello') + |COMMENT 'BLABLA' + |TBLPROPERTIES('prop1Key'="prop1Val") + |AS SELECT * FROM tab1 + """.stripMargin + val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] + assert(command.name.database.isEmpty) + assert(command.name.table == "view1") + assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello"))) + assert(command.originalText == Some("SELECT * FROM tab1")) + assert(command.properties == Map("prop1Key" -> "prop1Val")) + assert(command.comment == Some("BLABLA")) + } + + test("create view -- partitioned view") { + val v1 = "CREATE VIEW view1 partitioned on (ds, hr) as select * from srcpart" + intercept[ParseException] { + parser.parsePlan(v1) + } + } + + test("MSCK REPAIR table") { + val sql = "MSCK REPAIR TABLE tab1" + val parsed = parser.parsePlan(sql) + val expected = AlterTableRecoverPartitionsCommand( + TableIdentifier("tab1", None), + "MSCK REPAIR TABLE") + comparePlans(parsed, expected) + } + + test("create table like") { + val v1 = "CREATE TABLE table1 LIKE table2" + val (target, source, location, exists) = parser.parsePlan(v1).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + }.head + assert(exists == false) + assert(target.database.isEmpty) + assert(target.table == "table1") + assert(source.database.isEmpty) + assert(source.table == "table2") + assert(location.isEmpty) + + val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2" + val (target2, source2, location2, exists2) = parser.parsePlan(v2).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + }.head + assert(exists2) + assert(target2.database.isEmpty) + assert(target2.table == "table1") + assert(source2.database.isEmpty) + assert(source2.table == "table2") + assert(location2.isEmpty) + + val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'" + val (target3, source3, location3, exists3) = parser.parsePlan(v3).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + }.head + assert(!exists3) + assert(target3.database.isEmpty) + assert(target3.table == "table1") + assert(source3.database.isEmpty) + assert(source3.table == "table2") + assert(location3 == Some("/spark/warehouse")) + + val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'" + val (target4, source4, location4, exists4) = parser.parsePlan(v4).collect { + case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) + }.head + assert(exists4) + assert(target4.database.isEmpty) + assert(target4.table == "table1") + assert(source4.database.isEmpty) + assert(source4.table == "table2") + assert(location4 == Some("/spark/warehouse")) + } + + test("load data") { + val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" + val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { + case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) + }.head + assert(table.database.isEmpty) + assert(table.table == "table1") + assert(path == "path") + assert(!isLocal) + assert(!isOverwrite) + assert(partition.isEmpty) + + val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')" + val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect { + case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) + }.head + assert(table2.database.isEmpty) + assert(table2.table == "table1") + assert(path2 == "path") + assert(isLocal2) + assert(isOverwrite2) + assert(partition2.nonEmpty) + assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala deleted file mode 100644 index bee470d8e1382..0000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ /dev/null @@ -1,739 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.net.URI -import java.util.Locale - -import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans -import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.JsonTuple -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, ScriptTransformation} -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.CreateTable -import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.StructType - -class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingleton { - val parser = TestHive.sessionState.sqlParser - - private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { - parser.parsePlan(sql).collect { - case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) - }.head - } - - private def assertUnsupported(sql: String): Unit = { - val e = intercept[ParseException] { - parser.parsePlan(sql) - } - assert(e.getMessage.toLowerCase(Locale.ROOT).contains("operation not allowed")) - } - - private def analyzeCreateTable(sql: String): CatalogTable = { - TestHive.sessionState.analyzer.execute(parser.parsePlan(sql)).collect { - case CreateTableCommand(tableDesc, _) => tableDesc - }.head - } - - private def compareTransformQuery(sql: String, expected: LogicalPlan): Unit = { - val plan = parser.parsePlan(sql).asInstanceOf[ScriptTransformation].copy(ioschema = null) - comparePlans(plan, expected, checkAnalysis = false) - } - - test("Test CTAS #1") { - val s1 = - """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view - |COMMENT 'This is the staging page view table' - |STORED AS RCFILE - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src""".stripMargin - - val (desc, exists) = extractTableDesc(s1) - assert(exists) - assert(desc.identifier.database == Some("mydb")) - assert(desc.identifier.table == "page_view") - assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) - assert(desc.schema.isEmpty) // will be populated later when the table is actually created - assert(desc.comment == Some("This is the staging page view table")) - // TODO will be SQLText - assert(desc.viewText.isEmpty) - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.partitionColumnNames.isEmpty) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.storage.serde == - Some("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe")) - assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) - } - - test("Test CTAS #2") { - val s2 = - """CREATE EXTERNAL TABLE IF NOT EXISTS mydb.page_view - |COMMENT 'This is the staging page view table' - |ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe' - | STORED AS - | INPUTFORMAT 'parquet.hive.DeprecatedParquetInputFormat' - | OUTPUTFORMAT 'parquet.hive.DeprecatedParquetOutputFormat' - |LOCATION '/user/external/page_view' - |TBLPROPERTIES ('p1'='v1', 'p2'='v2') - |AS SELECT * FROM src""".stripMargin - - val (desc, exists) = extractTableDesc(s2) - assert(exists) - assert(desc.identifier.database == Some("mydb")) - assert(desc.identifier.table == "page_view") - assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some(new URI("/user/external/page_view"))) - assert(desc.schema.isEmpty) // will be populated later when the table is actually created - // TODO will be SQLText - assert(desc.comment == Some("This is the staging page view table")) - assert(desc.viewText.isEmpty) - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.partitionColumnNames.isEmpty) - assert(desc.storage.properties == Map()) - assert(desc.storage.inputFormat == Some("parquet.hive.DeprecatedParquetInputFormat")) - assert(desc.storage.outputFormat == Some("parquet.hive.DeprecatedParquetOutputFormat")) - assert(desc.storage.serde == Some("parquet.hive.serde.ParquetHiveSerDe")) - assert(desc.properties == Map("p1" -> "v1", "p2" -> "v2")) - } - - test("Test CTAS #3") { - val s3 = """CREATE TABLE page_view AS SELECT * FROM src""" - val (desc, exists) = extractTableDesc(s3) - assert(exists == false) - assert(desc.identifier.database == None) - assert(desc.identifier.table == "page_view") - assert(desc.tableType == CatalogTableType.MANAGED) - assert(desc.storage.locationUri == None) - assert(desc.schema.isEmpty) - assert(desc.viewText == None) // TODO will be SQLText - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.storage.properties == Map()) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - assert(desc.properties == Map()) - } - - test("Test CTAS #4") { - val s4 = - """CREATE TABLE page_view - |STORED BY 'storage.handler.class.name' AS SELECT * FROM src""".stripMargin - intercept[AnalysisException] { - extractTableDesc(s4) - } - } - - test("Test CTAS #5") { - val s5 = """CREATE TABLE ctas2 - | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" - | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2") - | STORED AS RCFile - | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22") - | AS - | SELECT key, value - | FROM src - | ORDER BY key, value""".stripMargin - val (desc, exists) = extractTableDesc(s5) - assert(exists == false) - assert(desc.identifier.database == None) - assert(desc.identifier.table == "ctas2") - assert(desc.tableType == CatalogTableType.MANAGED) - assert(desc.storage.locationUri == None) - assert(desc.schema.isEmpty) - assert(desc.viewText == None) // TODO will be SQLText - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.storage.properties == Map(("serde_p1" -> "p1"), ("serde_p2" -> "p2"))) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe")) - assert(desc.properties == Map(("tbl_p1" -> "p11"), ("tbl_p2" -> "p22"))) - } - - test("CTAS statement with a PARTITIONED BY clause is not allowed") { - assertUnsupported(s"CREATE TABLE ctas1 PARTITIONED BY (k int)" + - " AS SELECT key, value FROM (SELECT 1 as key, 2 as value) tmp") - } - - test("CTAS statement with schema") { - assertUnsupported(s"CREATE TABLE ctas1 (age INT, name STRING) AS SELECT * FROM src") - assertUnsupported(s"CREATE TABLE ctas1 (age INT, name STRING) AS SELECT 1, 'hello'") - } - - test("unsupported operations") { - intercept[ParseException] { - parser.parsePlan( - """ - |CREATE TEMPORARY TABLE ctas2 - |ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe" - |WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2") - |STORED AS RCFile - |TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22") - |AS SELECT key, value FROM src ORDER BY key, value - """.stripMargin) - } - intercept[ParseException] { - parser.parsePlan( - """ - |CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING) - |CLUSTERED BY(user_id) INTO 256 BUCKETS - |AS SELECT key, value FROM src ORDER BY key, value - """.stripMargin) - } - intercept[ParseException] { - parser.parsePlan( - """ - |CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING) - |SKEWED BY (key) ON (1,5,6) - |AS SELECT key, value FROM src ORDER BY key, value - """.stripMargin) - } - intercept[ParseException] { - parser.parsePlan( - """ - |SELECT TRANSFORM (key, value) USING 'cat' AS (tKey, tValue) - |ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.TypedBytesSerDe' - |RECORDREADER 'org.apache.hadoop.hive.contrib.util.typedbytes.TypedBytesRecordReader' - |FROM testData - """.stripMargin) - } - } - - test("Invalid interval term should throw AnalysisException") { - def assertError(sql: String, errorMessage: String): Unit = { - val e = intercept[AnalysisException] { - parser.parsePlan(sql) - } - assert(e.getMessage.contains(errorMessage)) - } - assertError("select interval '42-32' year to month", - "month 32 outside range [0, 11]") - assertError("select interval '5 49:12:15' day to second", - "hour 49 outside range [0, 23]") - assertError("select interval '.1111111111' second", - "nanosecond 1111111111 outside range") - } - - test("use native json_tuple instead of hive's UDTF in LATERAL VIEW") { - val analyzer = TestHive.sparkSession.sessionState.analyzer - val plan = analyzer.execute(parser.parsePlan( - """ - |SELECT * - |FROM (SELECT '{"f1": "value1", "f2": 12}' json) test - |LATERAL VIEW json_tuple(json, 'f1', 'f2') jt AS a, b - """.stripMargin)) - - assert(plan.children.head.asInstanceOf[Generate].generator.isInstanceOf[JsonTuple]) - } - - test("transform query spec") { - val p = ScriptTransformation( - Seq(UnresolvedAttribute("a"), UnresolvedAttribute("b")), - "func", Seq.empty, plans.table("e"), null) - - compareTransformQuery("select transform(a, b) using 'func' from e where f < 10", - p.copy(child = p.child.where('f < 10), output = Seq('key.string, 'value.string))) - compareTransformQuery("map a, b using 'func' as c, d from e", - p.copy(output = Seq('c.string, 'd.string))) - compareTransformQuery("reduce a, b using 'func' as (c int, d decimal(10, 0)) from e", - p.copy(output = Seq('c.int, 'd.decimal(10, 0)))) - } - - test("use backticks in output of Script Transform") { - parser.parsePlan( - """SELECT `t`.`thing1` - |FROM (SELECT TRANSFORM (`parquet_t1`.`key`, `parquet_t1`.`value`) - |USING 'cat' AS (`thing1` int, `thing2` string) FROM `default`.`parquet_t1`) AS t - """.stripMargin) - } - - test("use backticks in output of Generator") { - parser.parsePlan( - """ - |SELECT `gentab2`.`gencol2` - |FROM `default`.`src` - |LATERAL VIEW explode(array(array(1, 2, 3))) `gentab1` AS `gencol1` - |LATERAL VIEW explode(`gentab1`.`gencol1`) `gentab2` AS `gencol2` - """.stripMargin) - } - - test("use escaped backticks in output of Generator") { - parser.parsePlan( - """ - |SELECT `gen``tab2`.`gen``col2` - |FROM `default`.`src` - |LATERAL VIEW explode(array(array(1, 2, 3))) `gen``tab1` AS `gen``col1` - |LATERAL VIEW explode(`gen``tab1`.`gen``col1`) `gen``tab2` AS `gen``col2` - """.stripMargin) - } - - test("create table - basic") { - val query = "CREATE TABLE my_table (id int, name string)" - val (desc, allowExisting) = extractTableDesc(query) - assert(!allowExisting) - assert(desc.identifier.database.isEmpty) - assert(desc.identifier.table == "my_table") - assert(desc.tableType == CatalogTableType.MANAGED) - assert(desc.schema == new StructType().add("id", "int").add("name", "string")) - assert(desc.partitionColumnNames.isEmpty) - assert(desc.bucketSpec.isEmpty) - assert(desc.viewText.isEmpty) - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.storage.locationUri.isEmpty) - assert(desc.storage.inputFormat == - Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(desc.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - assert(desc.storage.properties.isEmpty) - assert(desc.properties.isEmpty) - assert(desc.comment.isEmpty) - } - - test("create table - with database name") { - val query = "CREATE TABLE dbx.my_table (id int, name string)" - val (desc, _) = extractTableDesc(query) - assert(desc.identifier.database == Some("dbx")) - assert(desc.identifier.table == "my_table") - } - - test("create table - temporary") { - val query = "CREATE TEMPORARY TABLE tab1 (id int, name string)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.message.contains("CREATE TEMPORARY TABLE is not supported yet")) - } - - test("create table - external") { - val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'" - val (desc, _) = extractTableDesc(query) - assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some(new URI("/path/to/nowhere"))) - } - - test("create table - if not exists") { - val query = "CREATE TABLE IF NOT EXISTS tab1 (id int, name string)" - val (_, allowExisting) = extractTableDesc(query) - assert(allowExisting) - } - - test("create table - comment") { - val query = "CREATE TABLE my_table (id int, name string) COMMENT 'its hot as hell below'" - val (desc, _) = extractTableDesc(query) - assert(desc.comment == Some("its hot as hell below")) - } - - test("create table - partitioned columns") { - val query = "CREATE TABLE my_table (id int, name string) PARTITIONED BY (month int)" - val (desc, _) = extractTableDesc(query) - assert(desc.schema == new StructType() - .add("id", "int") - .add("name", "string") - .add("month", "int")) - assert(desc.partitionColumnNames == Seq("month")) - } - - test("create table - clustered by") { - val numBuckets = 10 - val bucketedColumn = "id" - val sortColumn = "id" - val baseQuery = - s""" - CREATE TABLE my_table ( - $bucketedColumn int, - name string) - CLUSTERED BY($bucketedColumn) - """ - - val query1 = s"$baseQuery INTO $numBuckets BUCKETS" - val (desc1, _) = extractTableDesc(query1) - assert(desc1.bucketSpec.isDefined) - val bucketSpec1 = desc1.bucketSpec.get - assert(bucketSpec1.numBuckets == numBuckets) - assert(bucketSpec1.bucketColumnNames.head.equals(bucketedColumn)) - assert(bucketSpec1.sortColumnNames.isEmpty) - - val query2 = s"$baseQuery SORTED BY($sortColumn) INTO $numBuckets BUCKETS" - val (desc2, _) = extractTableDesc(query2) - assert(desc2.bucketSpec.isDefined) - val bucketSpec2 = desc2.bucketSpec.get - assert(bucketSpec2.numBuckets == numBuckets) - assert(bucketSpec2.bucketColumnNames.head.equals(bucketedColumn)) - assert(bucketSpec2.sortColumnNames.head.equals(sortColumn)) - } - - test("create table - skewed by") { - val baseQuery = "CREATE TABLE my_table (id int, name string) SKEWED BY" - val query1 = s"$baseQuery(id) ON (1, 10, 100)" - val query2 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z'))" - val query3 = s"$baseQuery(id, name) ON ((1, 'x'), (2, 'y'), (3, 'z')) STORED AS DIRECTORIES" - val e1 = intercept[ParseException] { parser.parsePlan(query1) } - val e2 = intercept[ParseException] { parser.parsePlan(query2) } - val e3 = intercept[ParseException] { parser.parsePlan(query3) } - assert(e1.getMessage.contains("Operation not allowed")) - assert(e2.getMessage.contains("Operation not allowed")) - assert(e3.getMessage.contains("Operation not allowed")) - } - - test("create table - row format") { - val baseQuery = "CREATE TABLE my_table (id int, name string) ROW FORMAT" - val query1 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff'" - val query2 = s"$baseQuery SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1')" - val query3 = - s""" - |$baseQuery DELIMITED FIELDS TERMINATED BY 'x' ESCAPED BY 'y' - |COLLECTION ITEMS TERMINATED BY 'a' - |MAP KEYS TERMINATED BY 'b' - |LINES TERMINATED BY '\n' - |NULL DEFINED AS 'c' - """.stripMargin - val (desc1, _) = extractTableDesc(query1) - val (desc2, _) = extractTableDesc(query2) - val (desc3, _) = extractTableDesc(query3) - assert(desc1.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc1.storage.properties.isEmpty) - assert(desc2.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc2.storage.properties == Map("k1" -> "v1")) - assert(desc3.storage.properties == Map( - "field.delim" -> "x", - "escape.delim" -> "y", - "serialization.format" -> "x", - "line.delim" -> "\n", - "colelction.delim" -> "a", // yes, it's a typo from Hive :) - "mapkey.delim" -> "b")) - } - - test("create table - file format") { - val baseQuery = "CREATE TABLE my_table (id int, name string) STORED AS" - val query1 = s"$baseQuery INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput'" - val query2 = s"$baseQuery ORC" - val (desc1, _) = extractTableDesc(query1) - val (desc2, _) = extractTableDesc(query2) - assert(desc1.storage.inputFormat == Some("winput")) - assert(desc1.storage.outputFormat == Some("wowput")) - assert(desc1.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - assert(desc2.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) - assert(desc2.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - assert(desc2.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - } - - test("create table - storage handler") { - val baseQuery = "CREATE TABLE my_table (id int, name string) STORED BY" - val query1 = s"$baseQuery 'org.papachi.StorageHandler'" - val query2 = s"$baseQuery 'org.mamachi.StorageHandler' WITH SERDEPROPERTIES ('k1'='v1')" - val e1 = intercept[ParseException] { parser.parsePlan(query1) } - val e2 = intercept[ParseException] { parser.parsePlan(query2) } - assert(e1.getMessage.contains("Operation not allowed")) - assert(e2.getMessage.contains("Operation not allowed")) - } - - test("create table - properties") { - val query = "CREATE TABLE my_table (id int, name string) TBLPROPERTIES ('k1'='v1', 'k2'='v2')" - val (desc, _) = extractTableDesc(query) - assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) - } - - test("create table - everything!") { - val query = - """ - |CREATE EXTERNAL TABLE IF NOT EXISTS dbx.my_table (id int, name string) - |COMMENT 'no comment' - |PARTITIONED BY (month int) - |ROW FORMAT SERDE 'org.apache.poof.serde.Baff' WITH SERDEPROPERTIES ('k1'='v1') - |STORED AS INPUTFORMAT 'winput' OUTPUTFORMAT 'wowput' - |LOCATION '/path/to/mercury' - |TBLPROPERTIES ('k1'='v1', 'k2'='v2') - """.stripMargin - val (desc, allowExisting) = extractTableDesc(query) - assert(allowExisting) - assert(desc.identifier.database == Some("dbx")) - assert(desc.identifier.table == "my_table") - assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.schema == new StructType() - .add("id", "int") - .add("name", "string") - .add("month", "int")) - assert(desc.partitionColumnNames == Seq("month")) - assert(desc.bucketSpec.isEmpty) - assert(desc.viewText.isEmpty) - assert(desc.viewDefaultDatabase.isEmpty) - assert(desc.viewQueryColumnNames.isEmpty) - assert(desc.storage.locationUri == Some(new URI("/path/to/mercury"))) - assert(desc.storage.inputFormat == Some("winput")) - assert(desc.storage.outputFormat == Some("wowput")) - assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) - assert(desc.storage.properties == Map("k1" -> "v1")) - assert(desc.properties == Map("k1" -> "v1", "k2" -> "v2")) - assert(desc.comment == Some("no comment")) - } - - test("create view -- basic") { - val v1 = "CREATE VIEW view1 AS SELECT * FROM tab1" - val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] - assert(!command.allowExisting) - assert(command.name.database.isEmpty) - assert(command.name.table == "view1") - assert(command.originalText == Some("SELECT * FROM tab1")) - assert(command.userSpecifiedColumns.isEmpty) - } - - test("create view - full") { - val v1 = - """ - |CREATE OR REPLACE VIEW view1 - |(col1, col3 COMMENT 'hello') - |COMMENT 'BLABLA' - |TBLPROPERTIES('prop1Key'="prop1Val") - |AS SELECT * FROM tab1 - """.stripMargin - val command = parser.parsePlan(v1).asInstanceOf[CreateViewCommand] - assert(command.name.database.isEmpty) - assert(command.name.table == "view1") - assert(command.userSpecifiedColumns == Seq("col1" -> None, "col3" -> Some("hello"))) - assert(command.originalText == Some("SELECT * FROM tab1")) - assert(command.properties == Map("prop1Key" -> "prop1Val")) - assert(command.comment == Some("BLABLA")) - } - - test("create view -- partitioned view") { - val v1 = "CREATE VIEW view1 partitioned on (ds, hr) as select * from srcpart" - intercept[ParseException] { - parser.parsePlan(v1) - } - } - - test("MSCK REPAIR table") { - val sql = "MSCK REPAIR TABLE tab1" - val parsed = parser.parsePlan(sql) - val expected = AlterTableRecoverPartitionsCommand( - TableIdentifier("tab1", None), - "MSCK REPAIR TABLE") - comparePlans(parsed, expected) - } - - test("create table like") { - val v1 = "CREATE TABLE table1 LIKE table2" - val (target, source, location, exists) = parser.parsePlan(v1).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) - }.head - assert(exists == false) - assert(target.database.isEmpty) - assert(target.table == "table1") - assert(source.database.isEmpty) - assert(source.table == "table2") - assert(location.isEmpty) - - val v2 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2" - val (target2, source2, location2, exists2) = parser.parsePlan(v2).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) - }.head - assert(exists2) - assert(target2.database.isEmpty) - assert(target2.table == "table1") - assert(source2.database.isEmpty) - assert(source2.table == "table2") - assert(location2.isEmpty) - - val v3 = "CREATE TABLE table1 LIKE table2 LOCATION '/spark/warehouse'" - val (target3, source3, location3, exists3) = parser.parsePlan(v3).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) - }.head - assert(!exists3) - assert(target3.database.isEmpty) - assert(target3.table == "table1") - assert(source3.database.isEmpty) - assert(source3.table == "table2") - assert(location3 == Some("/spark/warehouse")) - - val v4 = "CREATE TABLE IF NOT EXISTS table1 LIKE table2 LOCATION '/spark/warehouse'" - val (target4, source4, location4, exists4) = parser.parsePlan(v4).collect { - case CreateTableLikeCommand(t, s, l, allowExisting) => (t, s, l, allowExisting) - }.head - assert(exists4) - assert(target4.database.isEmpty) - assert(target4.table == "table1") - assert(source4.database.isEmpty) - assert(source4.table == "table2") - assert(location4 == Some("/spark/warehouse")) - } - - test("load data") { - val v1 = "LOAD DATA INPATH 'path' INTO TABLE table1" - val (table, path, isLocal, isOverwrite, partition) = parser.parsePlan(v1).collect { - case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) - }.head - assert(table.database.isEmpty) - assert(table.table == "table1") - assert(path == "path") - assert(!isLocal) - assert(!isOverwrite) - assert(partition.isEmpty) - - val v2 = "LOAD DATA LOCAL INPATH 'path' OVERWRITE INTO TABLE table1 PARTITION(c='1', d='2')" - val (table2, path2, isLocal2, isOverwrite2, partition2) = parser.parsePlan(v2).collect { - case LoadDataCommand(t, path, l, o, partition) => (t, path, l, o, partition) - }.head - assert(table2.database.isEmpty) - assert(table2.table == "table1") - assert(path2 == "path") - assert(isLocal2) - assert(isOverwrite2) - assert(partition2.nonEmpty) - assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2") - } - - test("Test the default fileformat for Hive-serde tables") { - withSQLConf("hive.default.fileformat" -> "orc") { - val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)") - assert(exists) - assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) - assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) - assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - } - - withSQLConf("hive.default.fileformat" -> "parquet") { - val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)") - assert(exists) - val input = desc.storage.inputFormat - val output = desc.storage.outputFormat - val serde = desc.storage.serde - assert(input == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) - assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) - assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) - } - } - - test("table name with schema") { - // regression test for SPARK-11778 - spark.sql("create schema usrdb") - spark.sql("create table usrdb.test(c int)") - spark.read.table("usrdb.test") - spark.sql("drop table usrdb.test") - spark.sql("drop schema usrdb") - } - - test("SPARK-15887: hive-site.xml should be loaded") { - assert(hiveClient.getConf("hive.in.test", "") == "true") - } - - test("create hive serde table with new syntax - basic") { - val sql = - """ - |CREATE TABLE t - |(id int, name string COMMENT 'blabla') - |USING hive - |OPTIONS (fileFormat 'parquet', my_prop 1) - |LOCATION '/tmp/file' - |COMMENT 'BLABLA' - """.stripMargin - - val table = analyzeCreateTable(sql) - assert(table.schema == new StructType() - .add("id", "int") - .add("name", "string", nullable = true, comment = "blabla")) - assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) - assert(table.storage.locationUri == Some(new URI("/tmp/file"))) - assert(table.storage.properties == Map("my_prop" -> "1")) - assert(table.comment == Some("BLABLA")) - - assert(table.storage.inputFormat == - Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) - assert(table.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) - assert(table.storage.serde == - Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) - } - - test("create hive serde table with new syntax - with partition and bucketing") { - val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)" - val table = analyzeCreateTable(v1) - assert(table.schema == new StructType().add("c1", "int").add("c2", "int")) - assert(table.partitionColumnNames == Seq("c2")) - // check the default formats - assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - assert(table.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) - assert(table.storage.outputFormat == - Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) - - val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS" - val e2 = intercept[AnalysisException](analyzeCreateTable(v2)) - assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) - - val v3 = - """ - |CREATE TABLE t (c1 int, c2 int) USING hive - |PARTITIONED BY (c2) - |CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin - val e3 = intercept[AnalysisException](analyzeCreateTable(v3)) - assert(e3.message.contains("Creating bucketed Hive serde table is not supported yet")) - } - - test("create hive serde table with new syntax - Hive options error checking") { - val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')" - val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1)) - assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat")) - - val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " + - "(fileFormat 'x', inputFormat 'a', outputFormat 'b')" - val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2)) - assert(e2.getMessage.contains( - "Cannot specify fileFormat and inputFormat/outputFormat together")) - - val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', serde 'a')" - val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3)) - assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a serde")) - - val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', fieldDelim ' ')" - val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4)) - assert(e4.getMessage.contains("Cannot specify delimiters with a custom serde")) - - val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')" - val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5)) - assert(e5.getMessage.contains("Cannot specify delimiters without fileFormat")) - - val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', fieldDelim ' ')" - val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6)) - assert(e6.getMessage.contains( - "Cannot specify delimiters as they are only compatible with fileFormat 'textfile'")) - - // The value of 'fileFormat' option is case-insensitive. - val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'TEXTFILE', lineDelim ',')" - val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7)) - assert(e7.getMessage.contains("Hive data source only support newline '\\n' as line delimiter")) - - val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')" - val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8)) - assert(e8.getMessage.contains("invalid fileFormat: 'wrong'")) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/TestHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/TestHiveSuite.scala index 193fa83dbad99..72f8e8ff7c688 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/TestHiveSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/TestHiveSuite.scala @@ -42,4 +42,8 @@ class TestHiveSuite extends TestHiveSingleton with SQLTestUtils { } testHiveSparkSession.reset() } + + test("SPARK-15887: hive-site.xml should be loaded") { + assert(hiveClient.getConf("hive.in.test", "") == "true") + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 4c2fea3eb68bc..ee64bc9f9ee04 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1998,4 +1998,15 @@ class HiveDDLSuite sq.stop() } } + + test("table name with schema") { + // regression test for SPARK-11778 + withDatabase("usrdb") { + spark.sql("create schema usrdb") + withTable("usrdb.test") { + spark.sql("create table usrdb.test(c int)") + spark.read.table("usrdb.test") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala index 7803ac39e508b..1c9f00141ae1d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSerDeSuite.scala @@ -17,15 +17,23 @@ package org.apache.spark.sql.hive.execution +import java.net.URI + import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.execution.metric.InputOutputMetricsHelper import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.types.StructType /** * A set of tests that validates support for Hive SerDe. */ -class HiveSerDeSuite extends HiveComparisonTest with BeforeAndAfterAll { +class HiveSerDeSuite extends HiveComparisonTest with PlanTest with BeforeAndAfterAll { override def beforeAll(): Unit = { import TestHive._ import org.apache.hadoop.hive.serde2.RegexSerDe @@ -60,4 +68,127 @@ class HiveSerDeSuite extends HiveComparisonTest with BeforeAndAfterAll { val serdeinsRes = InputOutputMetricsHelper.run(sql("select * from serdeins").toDF()) assert(serdeinsRes === (serdeinsCnt, 0L, serdeinsCnt) :: Nil) } + + private def extractTableDesc(sql: String): (CatalogTable, Boolean) = { + TestHive.sessionState.sqlParser.parsePlan(sql).collect { + case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore) + }.head + } + + private def analyzeCreateTable(sql: String): CatalogTable = { + TestHive.sessionState.analyzer.execute(TestHive.sessionState.sqlParser.parsePlan(sql)).collect { + case CreateTableCommand(tableDesc, _) => tableDesc + }.head + } + + test("Test the default fileformat for Hive-serde tables") { + withSQLConf("hive.default.fileformat" -> "orc") { + val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)") + assert(exists) + assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat")) + assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")) + assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) + } + + withSQLConf("hive.default.fileformat" -> "parquet") { + val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)") + assert(exists) + val input = desc.storage.inputFormat + val output = desc.storage.outputFormat + val serde = desc.storage.serde + assert(input == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + } + } + + test("create hive serde table with new syntax - basic") { + val sql = + """ + |CREATE TABLE t + |(id int, name string COMMENT 'blabla') + |USING hive + |OPTIONS (fileFormat 'parquet', my_prop 1) + |LOCATION '/tmp/file' + |COMMENT 'BLABLA' + """.stripMargin + + val table = analyzeCreateTable(sql) + assert(table.schema == new StructType() + .add("id", "int") + .add("name", "string", nullable = true, comment = "blabla")) + assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) + assert(table.storage.locationUri == Some(new URI("/tmp/file"))) + assert(table.storage.properties == Map("my_prop" -> "1")) + assert(table.comment == Some("BLABLA")) + + assert(table.storage.inputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")) + assert(table.storage.serde == + Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) + } + + test("create hive serde table with new syntax - with partition and bucketing") { + val v1 = "CREATE TABLE t (c1 int, c2 int) USING hive PARTITIONED BY (c2)" + val table = analyzeCreateTable(v1) + assert(table.schema == new StructType().add("c1", "int").add("c2", "int")) + assert(table.partitionColumnNames == Seq("c2")) + // check the default formats + assert(table.storage.serde == Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(table.storage.inputFormat == Some("org.apache.hadoop.mapred.TextInputFormat")) + assert(table.storage.outputFormat == + Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + + val v2 = "CREATE TABLE t (c1 int, c2 int) USING hive CLUSTERED BY (c2) INTO 4 BUCKETS" + val e2 = intercept[AnalysisException](analyzeCreateTable(v2)) + assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet")) + + val v3 = + """ + |CREATE TABLE t (c1 int, c2 int) USING hive + |PARTITIONED BY (c2) + |CLUSTERED BY (c2) INTO 4 BUCKETS""".stripMargin + val e3 = intercept[AnalysisException](analyzeCreateTable(v3)) + assert(e3.message.contains("Creating bucketed Hive serde table is not supported yet")) + } + + test("create hive serde table with new syntax - Hive options error checking") { + val v1 = "CREATE TABLE t (c1 int) USING hive OPTIONS (inputFormat 'abc')" + val e1 = intercept[IllegalArgumentException](analyzeCreateTable(v1)) + assert(e1.getMessage.contains("Cannot specify only inputFormat or outputFormat")) + + val v2 = "CREATE TABLE t (c1 int) USING hive OPTIONS " + + "(fileFormat 'x', inputFormat 'a', outputFormat 'b')" + val e2 = intercept[IllegalArgumentException](analyzeCreateTable(v2)) + assert(e2.getMessage.contains( + "Cannot specify fileFormat and inputFormat/outputFormat together")) + + val v3 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', serde 'a')" + val e3 = intercept[IllegalArgumentException](analyzeCreateTable(v3)) + assert(e3.getMessage.contains("fileFormat 'parquet' already specifies a serde")) + + val v4 = "CREATE TABLE t (c1 int) USING hive OPTIONS (serde 'a', fieldDelim ' ')" + val e4 = intercept[IllegalArgumentException](analyzeCreateTable(v4)) + assert(e4.getMessage.contains("Cannot specify delimiters with a custom serde")) + + val v5 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fieldDelim ' ')" + val e5 = intercept[IllegalArgumentException](analyzeCreateTable(v5)) + assert(e5.getMessage.contains("Cannot specify delimiters without fileFormat")) + + val v6 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'parquet', fieldDelim ' ')" + val e6 = intercept[IllegalArgumentException](analyzeCreateTable(v6)) + assert(e6.getMessage.contains( + "Cannot specify delimiters as they are only compatible with fileFormat 'textfile'")) + + // The value of 'fileFormat' option is case-insensitive. + val v7 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'TEXTFILE', lineDelim ',')" + val e7 = intercept[IllegalArgumentException](analyzeCreateTable(v7)) + assert(e7.getMessage.contains("Hive data source only support newline '\\n' as line delimiter")) + + val v8 = "CREATE TABLE t (c1 int) USING hive OPTIONS (fileFormat 'wrong')" + val e8 = intercept[IllegalArgumentException](analyzeCreateTable(v8)) + assert(e8.getMessage.contains("invalid fileFormat: 'wrong'")) + } }