Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20831] [SQL] Fix INSERT OVERWRITE data source tables with IF NOT EXISTS #18050

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ package object dsl {
def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
InsertIntoTable(
analysis.UnresolvedRelation(TableIdentifier(tableName)),
Map.empty, logicalPlan, overwrite, false)
Map.empty, logicalPlan, overwrite, ifPartitionNotExists = false)

def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,17 +410,20 @@ case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) exten
* would have Map('a' -> Some('1'), 'b' -> None).
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean)
ifPartitionNotExists: Boolean)
extends LogicalPlan {
assert(overwrite || !ifNotExists)
assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
// IF NOT EXISTS is only valid in INSERT OVERWRITE
assert(overwrite || !ifPartitionNotExists)
// IF NOT EXISTS is only valid in static partitions
assert(partition.values.forall(_.nonEmpty) || !ifPartitionNotExists)

// We don't want `table` in children as sometimes we don't want to transform it.
override def children: Seq[LogicalPlan] = query :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ object SQLConf {
val HIVE_MANAGE_FILESOURCE_PARTITIONS =
buildConf("spark.sql.hive.manageFilesourcePartitions")
.doc("When true, enable metastore partition management for file source tables as well. " +
"This includes both datasource and converted Hive tables. When partition managment " +
"This includes both datasource and converted Hive tables. When partition management " +
"is enabled, datasource tables store partition in the Hive metastore, and use the " +
"metastore to prune partitions during query planning.")
.booleanConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ class PlanParserSuite extends PlanTest {
def insert(
partition: Map[String, Option[String]],
overwrite: Boolean = false,
ifNotExists: Boolean = false): LogicalPlan =
InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
ifPartitionNotExists: Boolean = false): LogicalPlan =
InsertIntoTable(table("s"), partition, plan, overwrite, ifPartitionNotExists)

// Single inserts
assertEqual(s"insert overwrite table s $sql",
insert(Map.empty, overwrite = true))
assertEqual(s"insert overwrite table s partition (e = 1) if not exists $sql",
insert(Map("e" -> Option("1")), overwrite = true, ifNotExists = true))
insert(Map("e" -> Option("1")), overwrite = true, ifPartitionNotExists = true))
assertEqual(s"insert into s $sql",
insert(Map.empty))
assertEqual(s"insert into table s partition (c = 'd', e = 1) $sql",
Expand All @@ -193,9 +193,9 @@ class PlanParserSuite extends PlanTest {
val plan2 = table("t").where('x > 5).select(star())
assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
InsertIntoTable(
table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union(
table("s"), Map.empty, plan.limit(1), false, ifPartitionNotExists = false).union(
InsertIntoTable(
table("u"), Map.empty, plan2, false, ifNotExists = false)))
table("u"), Map.empty, plan2, false, ifPartitionNotExists = false)))
}

test ("insert with if not exists") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partition = Map.empty[String, Option[String]],
query = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)
ifPartitionNotExists = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ case class DataSource(
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
staticPartitions = Map.empty,
ifPartitionNotExists = false,
partitionColumns = partitionAttributes,
bucketSpec = bucketSpec,
fileFormat = format,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
parts, query, overwrite, false) if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false) =>
case i @ InsertIntoTable(
l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
// to include those constant column values in the query result.
Expand Down Expand Up @@ -195,6 +195,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
InsertIntoHadoopFsRelationCommand(
outputPath,
staticPartitions,
i.ifPartitionNotExists,
partitionSchema,
t.bucketSpec,
t.fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ import org.apache.spark.sql.execution.command._
* overwrites: when the spec is empty, all partitions are overwritten.
* When it covers a prefix of the partition keys, only partitions matching
* the prefix are overwritten.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
staticPartitions: TablePartitionSpec,
ifPartitionNotExists: Boolean,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
Expand All @@ -61,8 +64,8 @@ case class InsertIntoHadoopFsRelationCommand(
val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => "\"" + x + "\""
}.mkString(", ")
throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " +
s"cannot save to file.")
throw new AnalysisException(s"Duplicate column(s): $duplicateColumns found, " +
"cannot save to file.")
}

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
Expand All @@ -76,11 +79,12 @@ case class InsertIntoHadoopFsRelationCommand(

var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
var matchingPartitions: Seq[CatalogTablePartition] = Seq.empty

// When partitions are tracked by the catalog, compute all custom partition locations that
// may be relevant to the insertion job.
if (partitionsTrackedByCatalog) {
val matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
matchingPartitions = sparkSession.sessionState.catalog.listPartitions(
catalogTable.get.identifier, Some(staticPartitions))
initialMatchingPartitions = matchingPartitions.map(_.spec)
customPartitionLocations = getCustomPartitionLocations(
Expand All @@ -101,8 +105,12 @@ case class InsertIntoHadoopFsRelationCommand(
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
} else {
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
}
case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
true
case (SaveMode.Ignore, exists) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
*/
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists)
if DDLUtils.isHiveTable(relation.tableMeta) =>
InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists)
case InsertIntoTable(r: CatalogRelation, partSpec, query, overwrite, ifPartitionNotExists)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists)

case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) =>
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
Expand Down Expand Up @@ -207,11 +207,11 @@ case class RelationConversions(
override def apply(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// Write path
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists)
case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifPartitionNotExists)
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifNotExists)
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)

// Read path
case relation: CatalogRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ case class CreateHiveTableAsSelectCommand(
Map(),
query,
overwrite = false,
ifNotExists = false)).toRdd
ifPartitionNotExists = false)).toRdd
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
Expand All @@ -78,7 +78,7 @@ case class CreateHiveTableAsSelectCommand(
Map(),
query,
overwrite = true,
ifNotExists = false)).toRdd
ifPartitionNotExists = false)).toRdd
} catch {
case NonFatal(e) =>
// drop the created table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ import org.apache.spark.SparkException
* }}}.
* @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
* @param ifPartitionNotExists If true, only write if the partition does not exist.
* Only valid for static partitions.
*/
case class InsertIntoHiveTable(
table: CatalogTable,
partition: Map[String, Option[String]],
query: LogicalPlan,
overwrite: Boolean,
ifNotExists: Boolean) extends RunnableCommand {
ifPartitionNotExists: Boolean) extends RunnableCommand {

override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

Expand Down Expand Up @@ -375,7 +376,7 @@ case class InsertIntoHiveTable(

var doHiveOverwrite = overwrite

if (oldPart.isEmpty || !ifNotExists) {
if (oldPart.isEmpty || !ifPartitionNotExists) {
// SPARK-18107: Insert overwrite runs much slower than hive-client.
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,72 +166,54 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
sql("DROP TABLE tmp_table")
}

test("INSERT OVERWRITE - partition IF NOT EXISTS") {
withTempDir { tmpDir =>
val table = "table_with_partition"
withTable(table) {
val selQuery = s"select c1, p1, p2 from $table"
sql(
s"""
|CREATE TABLE $table(c1 string)
|PARTITIONED by (p1 string,p2 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2='b')
|SELECT 'blarr'
""".stripMargin)
checkAnswer(
sql(selQuery),
Row("blarr", "a", "b"))

sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2='b')
|SELECT 'blarr2'
""".stripMargin)
checkAnswer(
sql(selQuery),
Row("blarr2", "a", "b"))
testPartitionedTable("INSERT OVERWRITE - partition IF NOT EXISTS") { tableName =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a little weird to test data source table insertion in InsertIntoHiveTableSuite...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyway, we can fix it later

Copy link
Member Author

@gatorsmile gatorsmile May 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create and move all these tests to a new suite InsertIntoTableSuite?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are still under hive/?

Copy link
Member

@viirya viirya May 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not too verbose, I'd like to have them separated, instead of mixing together in one test suite under hive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move the test cases to a new one in /core and let InsertIntoHiveTableSuite extends that one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can do it as a test-only PR.

val selQuery = s"select a, b, c, d from $tableName"
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c=3)
|SELECT 1, 4
""".stripMargin)
checkAnswer(sql(selQuery), Row(1, 2, 3, 4))

var e = intercept[AnalysisException] {
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2) IF NOT EXISTS
|SELECT 'blarr3', 'newPartition'
""".stripMargin)
}
assert(e.getMessage.contains(
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]"))
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c=3)
|SELECT 5, 6
""".stripMargin)
checkAnswer(sql(selQuery), Row(5, 2, 3, 6))

val e = intercept[AnalysisException] {
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c) IF NOT EXISTS
|SELECT 7, 8, 3
""".stripMargin)
}
assert(e.getMessage.contains(
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [c]"))

e = intercept[AnalysisException] {
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2) IF NOT EXISTS
|SELECT 'blarr3', 'b'
""".stripMargin)
}
assert(e.getMessage.contains(
"Dynamic partitions do not support IF NOT EXISTS. Specified partitions with value: [p2]"))
// If the partition already exists, the insert will overwrite the data
// unless users specify IF NOT EXISTS
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=2, c=3) IF NOT EXISTS
|SELECT 9, 10
""".stripMargin)
checkAnswer(sql(selQuery), Row(5, 2, 3, 6))

// If the partition already exists, the insert will overwrite the data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems IF NOT EXISTS works previously with INSERT OVERWRITE? It doesn't overwrite the existing data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. This is for Hive table.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So seems we should also add a test for IF NOT EXISTS with INSERT OVERWRITE for datasource table?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why I used testPartitionedTable to do the test. It tests both data source and hive serde tables.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Looks good.

// unless users specify IF NOT EXISTS
sql(
s"""
|INSERT OVERWRITE TABLE $table
|partition (p1='a',p2='b') IF NOT EXISTS
|SELECT 'blarr3'
""".stripMargin)
checkAnswer(
sql(selQuery),
Row("blarr2", "a", "b"))
}
}
// ADD PARTITION has the same effect, even if no actual data is inserted.
sql(s"ALTER TABLE $tableName ADD PARTITION (b=21, c=31)")
sql(
s"""
|INSERT OVERWRITE TABLE $tableName
|partition (b=21, c=31) IF NOT EXISTS
|SELECT 20, 24
""".stripMargin)
checkAnswer(sql(selQuery), Row(5, 2, 3, 6))
}

test("Insert ArrayType.containsNull == false") {
Expand Down