-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27845][SQL] DataSourceV2: InsertTable #24832
Conversation
Test build #106359 has finished for PR 24832 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Outdated
Show resolved
Hide resolved
@@ -274,7 +284,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |||
override def visitInsertOverwriteTable( | |||
ctx: InsertOverwriteTableContext): InsertTableParams = withOrigin(ctx) { | |||
assert(ctx.OVERWRITE() != null) | |||
val tableIdent = visitTableIdentifier(ctx.tableIdentifier) | |||
val tableIdent = visitMultipartIdentifier(ctx.multipartIdentifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be updated to remove the ParseException
thrown when IF NOT EXISTS
is present and there are dynamic partitions. I think that is an analysis problem, not a parse problem.
Also, I don't see a reason why IF NOT EXISTS would not be supported with dynamic partitions. Wouldn't that fail if any partitions would be overwritten? It seems to make sense to me, but maybe there is a good reason why this is not allowed? @gatorsmile can you comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this in the DSv2 sync last night and decided to add a method to the write builder to pass this IF NOT EXISTS
flag. This will be done in a follow-up to avoid over-complicating this commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebase and squash |
Test build #106455 has finished for PR 24832 at commit
|
Test build #106950 has finished for PR 24832 at commit
|
Test build #106951 has finished for PR 24832 at commit
|
Will do DataFrameWriter.insertInto in a separate PR, so this PR is no longer WIP. |
Will look into supporting |
Test build #106952 has finished for PR 24832 at commit
|
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} | |||
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ | |||
import org.apache.spark.sql.catalyst.plans._ | |||
import org.apache.spark.sql.catalyst.plans.logical._ | |||
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DropTableStatement, DropViewStatement, QualifiedColType} | |||
import org.apache.spark.sql.catalyst.plans.logical.sql._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this is a bad practice because it can cause git conflicts and pollutes the namespace.
It's probably okay here because there are only logical plans in that package, but in other places this causes problems when it imports packages as well as classes.
7daa572
to
6c33ac3
Compare
Test build #107141 has finished for PR 24832 at commit
|
private class Overwrite(filters: Array[Filter]) extends TestBatchWrite { | ||
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { | ||
val deleteKeys = dataMap.keys.filter { partValues => | ||
filters.exists { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this matches a key if any value matches a filter expression. exists
Scaladoc says "Tests whether a predicate holds for at least one value", so this is implementing an OR of all the filters, but the desired behavior is an AND of all the filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be a forall
override def abort(messages: Array[WriterCommitMessage]): Unit = { | ||
private object TruncateAndAppend extends TestBatchWrite { | ||
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { | ||
dataMap = mutable.Map.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use dataMap.clear
instead of re-assigning because this is synchronized on the original dataMap
instance. After reassignment, another thread will be able to enter a synchronized
block on the new instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue You forgot to address this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll fix it.
val staticPartitionProjectList = { | ||
// check that the data column counts match | ||
val numColumns = table.output.size | ||
if (numColumns > staticPartitions.size + i.query.output.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ResolveOutputRelation
rule will check this and produces a more useful error message: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2336-L2339
I think this rule should simply transform the query assuming that it will work, and let ResolveOutputRelation
ensure that the types and number of columns align.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To do this, I think you just need to add all remaining query columns from the iterator after table.output
is exhausted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, it'd be great if ResolveOutputRelation
does all the checking and necessary casting. Seems like it already has most of the logic built in. This method can just look at the partition values, and then convert to AppendData or OverwriteByExpression
// ifPartitionNotExists is append with validation, but validation is not supported | ||
if (i.ifPartitionNotExists) { | ||
throw new AnalysisException( | ||
s"Cannot write, IF NOT EXISTS is not supported for table: ${table.table.name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses table
to refer to a DataSourceV2Relation
, which causes this awkward reference because the relation has an actual table
: table.table.name
. It would be better to call the relation rel
or relation
.
throw new AnalysisException(s"Cannot write: not enough columns") | ||
} | ||
|
||
val staticNames = staticPartitions.keySet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should validate that staticPartitions
are all names for columns used in identity partitions. It is not valid to supply a static partition value for a non-partition column and it is not allowed to supply a static partition value for transform-derived columns.
conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC | ||
|
||
val query = | ||
if (staticPartitions.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is true, then this should avoid building the staticPartitionProjectList
. I'd recommend refactoring that into a method, or moving it into the else block.
Test build #107707 has finished for PR 24832 at commit
|
faa2e85
to
a67aef7
Compare
@brkyvz, I've updated this PR since John is out on vacation. Could you have another look? |
Test build #107708 has finished for PR 24832 at commit
|
Test build #107766 has finished for PR 24832 at commit
|
Test build #107799 has finished for PR 24832 at commit
|
Test build #107800 has finished for PR 24832 at commit
|
Test build #108065 has finished for PR 24832 at commit
|
Test build #108118 has finished for PR 24832 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM. I have some very minor comments around the parser changes.
: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable | ||
| INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable | ||
: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable | ||
| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? #insertIntoTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to wrap with parentheses (partitionSpec (IF NOT EXISTS)?)?
like above? Otherwise, what happens if there's no partitionSpec
but the IF NOT EXISTS
?
- If the table not exists? Then wouldn't that be CTAS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't supported either way, so why combine the two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) | ||
|
||
if (ctx.EXISTS != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the point of adding this to the parser, if we're not going to support it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a better error message that is testable. Before, there were no tests for this case and the error message listed expected symbols.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since the PARTITION clause is optional for the above case, it shouldn't group the two together either. It is semantically incorrect because a write to a partitioned table is always a partitioned write.
case _ => | ||
throw new IllegalArgumentException(s"Unknown filter attribute: $attr") | ||
} | ||
case f @ _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, no need for @ _
override def abort(messages: Array[WriterCommitMessage]): Unit = { | ||
private object TruncateAndAppend extends TestBatchWrite { | ||
override def commit(messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized { | ||
dataMap = mutable.Map.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue You forgot to address this?
@@ -23,17 +23,19 @@ import scala.collection.mutable | |||
|
|||
import org.apache.spark.sql.{AnalysisException, SaveMode} | |||
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, LookupCatalog, TableCatalog} | |||
import org.apache.spark.sql.catalog.v2.expressions.Transform | |||
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are any of the changes here needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there were unused imports. I'll commit a fix.
: INSERT OVERWRITE TABLE tableIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable | ||
| INSERT INTO TABLE? tableIdentifier partitionSpec? #insertIntoTable | ||
: INSERT OVERWRITE TABLE? multipartIdentifier (partitionSpec (IF NOT EXISTS)?)? #insertOverwriteTable | ||
| INSERT INTO TABLE? multipartIdentifier partitionSpec? (IF NOT EXISTS)? #insertIntoTable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (partitionSpec (IF NOT EXISTS)?)?
is better? INSERT INTO TABLE ... IF NOT EXISTS
doesn't make sense. The IF NOT EXISTS
is only for partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, IF NOT EXISTS
doesn't make sense to partition either. It's append not overwrite, and it seems weird to me if we can't append to an existing partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep this unchanged and not add IF NOT EXISTS
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed to get a better error message. Instead of a parse exception that lists symbols, this is now a useful error message with a test.
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) | ||
|
||
val dynamicPartitionKeys: Map[String, Option[String]] = partitionKeys.filter(_._2.isEmpty) | ||
if (ctx.EXISTS != null && dynamicPartitionKeys.nonEmpty) { | ||
throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " + | ||
"partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) | ||
operationNotAllowed("IF NOT EXISTS with dynamic partitions: " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we change the error message here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This uses operationNotAllowed
instead of throwing a custom ParseException
, like other methods that do not allow specific combinations. I think it's a good idea to standardize on the existing method. This also makes the error message like the others, where it states clear what is not allowed.
assert(exc.getMessage.contains("p2")) | ||
} | ||
|
||
test("insert table: if not exists without overwrite fails") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, @brkyvz, this is the test that required adding the IF NOT EXISTS
to INSERT INTO
. I think it is better to have a good error message instead of relying on not being able to parse the statement.
|
@brkyvz, that's correct. Tests also validate that the error messages for too many or too few columns are the ones from |
Test build #108178 has finished for PR 24832 at commit
|
Magic 💯 |
Merging to master! |
Thanks for fixing this @jzhuge! And thanks to @brkyvz and @cloud-fan for the reviews. |
Thanks @rdblue for covering for me while I am on vacation in addition to being a reviewer! I will rebase PR #24980 "[SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo". |
What changes were proposed in this pull request?
Support multiple catalogs in the following InsertTable use cases:
Support matrix:
Notes:
STATIC
is the default Partition Overwrite Mode for data source tables.DSv2 tables currently do not supportIfPartitionNotExists
.How was this patch tested?
New tests.
All existing catalyst and sql/core tests.