Skip to content

Commit

Permalink
Update for review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Jul 24, 2019
1 parent 8a3b61d commit efc4bf7
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ query
;

insertInto
: INSERT OVERWRITE TABLE multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable
| INSERT INTO TABLE? multipartIdentifier partitionSpec? #insertIntoTable
: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable
| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? #insertIntoTable
| INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? #insertOverwriteDir
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,10 +384,10 @@ package object dsl {

def insertInto(
table: LogicalPlan,
overwrite: Boolean = false,
partition: Map[String, Option[String]] = Map.empty,
overwrite: Boolean = false,
ifPartitionNotExists: Boolean = false): LogicalPlan =
InsertTableStatement(table, logicalPlan, overwrite, partition, ifPartitionNotExists)
InsertIntoStatement(table, partition, logicalPlan, overwrite, ifPartitionNotExists)

def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertTableStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -264,19 +264,19 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx match {
case table: InsertIntoTableContext =>
val (tableIdent, partition, ifPartitionNotExists) = visitInsertIntoTable(table)
InsertTableStatement(
InsertIntoStatement(
UnresolvedRelation(tableIdent),
partition,
query,
overwrite = false,
partition,
ifPartitionNotExists)
case table: InsertOverwriteTableContext =>
val (tableIdent, partition, ifPartitionNotExists) = visitInsertOverwriteTable(table)
InsertTableStatement(
InsertIntoStatement(
UnresolvedRelation(tableIdent),
partition,
query,
overwrite = true,
partition,
ifPartitionNotExists)
case dir: InsertOverwriteDirContext =>
val (isLocal, storage, provider) = visitInsertOverwriteDir(dir)
Expand All @@ -297,6 +297,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val tableIdent = visitMultipartIdentifier(ctx.multipartIdentifier)
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)

if (ctx.EXISTS != null) {
operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx)
}

(tableIdent, partitionKeys, false)
}

Expand All @@ -311,8 +315,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

val dynamicPartitionKeys: Map[String, Option[String]] = partitionKeys.filter(_._2.isEmpty)
if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) {
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
operationNotAllowed("IF NOT EXISTS with dynamic partitions: " +
dynamicPartitionKeys.keys.mkString(","), ctx)
}

(tableIdent, partitionKeys, ctx.EXISTS() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.sql.catalyst.plans.logical.sql
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* An INSERT TABLE statement, as parsed from SQL.
* An INSERT INTO statement, as parsed from SQL.
*
* @param table the logical plan representing the table.
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param partition a map from the partition key to the partition value (optional).
* @param partitionSpec a map from the partition key to the partition value (optional).
* If the value is missing, dynamic partition insert will be performed.
* As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS` would have
* Map('a' -> Some('1'), 'b' -> Some('2')),
Expand All @@ -34,17 +34,17 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertTableStatement(
case class InsertIntoStatement(
table: LogicalPlan,
partitionSpec: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
partition: Map[String, Option[String]],
ifPartitionNotExists: Boolean) extends ParsedStatement {

// IF NOT EXISTS is only valid in INSERT OVERWRITE
assert(overwrite || !ifPartitionNotExists)
// IF NOT EXISTS is only valid in static partitions
assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists)
require(overwrite || !ifPartitionNotExists,
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
require(partitionSpec.values.forall(_.nonEmpty) || !ifPartitionNotExists,
"IF NOT EXISTS is only valid with static partitions")

override def children: Seq[LogicalPlan] = query :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package org.apache.spark.sql.catalyst.parser

import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -617,61 +617,110 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("insert table: append") {
parseCompare("INSERT INTO TABLE testcat.ns1.ns2.tbl TABLE source",
table("source").insertInto(table("testcat", "ns1", "ns2", "tbl")))
test("insert table: basic append") {
Seq(
"INSERT INTO TABLE testcat.ns1.ns2.tbl SELECT * FROM source",
"INSERT INTO testcat.ns1.ns2.tbl SELECT * FROM source"
).foreach { sql =>
parseCompare(sql,
InsertIntoStatement(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
Map.empty,
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = false, ifPartitionNotExists = false))
}
}

test("insert table: append from another catalog") {
parseCompare("INSERT INTO TABLE testcat.ns1.ns2.tbl TABLE testcat2.db.tbl",
table("testcat2", "db", "tbl").insertInto(table("testcat", "ns1", "ns2", "tbl")))
parseCompare("INSERT INTO TABLE testcat.ns1.ns2.tbl SELECT * FROM testcat2.db.tbl",
InsertIntoStatement(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
Map.empty,
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("testcat2", "db", "tbl"))),
overwrite = false, ifPartitionNotExists = false))
}

test("insert table: append with partition") {
parseCompare(
"""
|INSERT INTO testcat.ns1.ns2.tbl
|PARTITION (p1 = 3, p2)
|TABLE source
|SELECT * FROM source
""".stripMargin,
table("source")
.insertInto(
table("testcat", "ns1", "ns2", "tbl"),
partition = Map("p1" -> Some("3"), "p2" -> None)))
InsertIntoStatement(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
Map("p1" -> Some("3"), "p2" -> None),
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = false, ifPartitionNotExists = false))
}

test("insert table: overwrite") {
parseCompare("INSERT OVERWRITE TABLE testcat.ns1.ns2.tbl TABLE source",
table("source").insertInto(table("testcat", "ns1", "ns2", "tbl"), overwrite = true))
Seq(
"INSERT OVERWRITE TABLE testcat.ns1.ns2.tbl SELECT * FROM source",
"INSERT OVERWRITE testcat.ns1.ns2.tbl SELECT * FROM source"
).foreach { sql =>
parseCompare(sql,
InsertIntoStatement(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
Map.empty,
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true, ifPartitionNotExists = false))
}
}

test("insert table: overwrite with partition") {
parseCompare(
"""
|INSERT OVERWRITE TABLE testcat.ns1.ns2.tbl
|PARTITION (p1 = 3, p2)
|TABLE source
|SELECT * FROM source
""".stripMargin,
table("source")
.insertInto(
table("testcat", "ns1", "ns2", "tbl"),
overwrite = true,
partition = Map("p1" -> Some("3"), "p2" -> None)))
InsertIntoStatement(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
Map("p1" -> Some("3"), "p2" -> None),
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true, ifPartitionNotExists = false))
}

test("insert table: overwrite with partition if not exists") {
parseCompare(
"""
|INSERT OVERWRITE TABLE testcat.ns1.ns2.tbl
|PARTITION (p1 = 3) IF NOT EXISTS
|TABLE source
|SELECT * FROM source
""".stripMargin,
table("source")
.insertInto(
table("testcat", "ns1", "ns2", "tbl"),
overwrite = true,
partition = Map("p1" -> Some("3")),
ifPartitionNotExists = true))
InsertIntoStatement(
UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
Map("p1" -> Some("3")),
Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true, ifPartitionNotExists = true))
}

test("insert table: if not exists with dynamic partition fails") {
val exc = intercept[AnalysisException] {
parsePlan(
"""
|INSERT OVERWRITE TABLE testcat.ns1.ns2.tbl
|PARTITION (p1 = 3, p2) IF NOT EXISTS
|SELECT * FROM source
""".stripMargin)
}

assert(exc.getMessage.contains("IF NOT EXISTS with dynamic partitions"))
assert(exc.getMessage.contains("p2"))
}

test("insert table: if not exists without overwrite fails") {
val exc = intercept[AnalysisException] {
parsePlan(
"""
|INSERT INTO TABLE testcat.ns1.ns2.tbl
|PARTITION (p1 = 3) IF NOT EXISTS
|SELECT * FROM source
""".stripMargin)
}

assert(exc.getMessage.contains("INSERT INTO ... IF NOT EXISTS"))
}

private case class TableSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAlias, Un
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType

Expand Down Expand Up @@ -184,13 +185,15 @@ class PlanParserSuite extends AnalysisTest {
}

test("insert into") {
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
val sql = "select * from t"
val plan = table("t").select(star())
def insert(
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifPartitionNotExists: Boolean = false): LogicalPlan =
plan.insertInto(table("s"), overwrite, partition, ifPartitionNotExists)
InsertIntoStatement(table("s"), partition, plan, overwrite, ifPartitionNotExists)

// Single inserts
assertEqual(s"insert overwrite table s $sql",
Expand All @@ -208,13 +211,6 @@ class PlanParserSuite extends AnalysisTest {
plan.limit(1).insertInto("s").union(plan2.insertInto("u")))
}

test ("insert with if not exists") {
val sql = "select * from t"
intercept(s"insert overwrite table s partition (e = 1, x) if not exists $sql",
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [x]")
intercept[ParseException](parsePlan(s"insert overwrite table s if not exists $sql"))
}

test("aggregation") {
val sql = "select a, b, sum(c) as c from d group by a, b"

Expand Down Expand Up @@ -616,12 +612,11 @@ class PlanParserSuite extends AnalysisTest {
comparePlans(
parsePlan(
"INSERT INTO s SELECT /*+ REPARTITION(100), COALESCE(500), COALESCE(10) */ * FROM t"),
table("t")
.select(star())
.hint("COALESCE", Literal(10))
.hint("COALESCE", Literal(500))
.hint("REPARTITION", Literal(100))
.insertInto("s"))
InsertIntoStatement(table("s"), Map.empty,
UnresolvedHint("REPARTITION", Seq(Literal(100)),
UnresolvedHint("COALESCE", Seq(Literal(500)),
UnresolvedHint("COALESCE", Seq(Literal(10)),
table("t").select(star())))), overwrite = false, ifPartitionNotExists = false))

comparePlans(
parsePlan("SELECT /*+ BROADCASTJOIN(u), REPARTITION(100) */ * FROM t"),
Expand Down
Loading

0 comments on commit efc4bf7

Please sign in to comment.