-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28554][SQL] Adds a v1 fallback writer implementation for v2 data source codepaths #25348
Conversation
Test build #108613 has finished for PR 25348 at commit
|
Test build #108615 has finished for PR 25348 at commit
|
Do we have a plan to support read fallback? |
val batchWrite = newWriteBuilder() match { | ||
newWriteBuilder() match { | ||
case v1: V1WriteBuilder if isTruncate(deleteWhere) => | ||
writeWithV1(v1.buildForV1Write(), SaveMode.Overwrite, writeOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overwrite is ambiguous and doesn't specify whether the table data should be truncated, replaced dynamically by partition, etc. It isn't possible for v1 sources to guarantee the right behavior -- deleting data that matches deleteWhere
-- so v1 fallback should not be supported here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to just do this but only if "deleteWhere") is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. If you pass SaveMode.Overwrite
into a v1 implementation, the behavior is undefined. So we shouldn't ever pass this.
I was talking to @brkyvz directly and we think that we can use SupportsDelete
to make this possible. If a table supports deleting by filter, then we can run that first and then run the insert. But that should be done in a follow-up PR.
mode: SaveMode, | ||
options: CaseInsensitiveStringMap): RDD[InternalRow] = { | ||
relation.createRelation( | ||
sqlContext, mode, options.asScala.toMap, Dataset.ofRows(sqlContext.sparkSession, plan)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use the original options map that preserves the case that was passed in.
doWrite(batchWrite) | ||
writer match { | ||
case v1: V1WriteBuilder => | ||
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table was just created above, so both of these modes are incorrect. This should be SaveMode.Append
because the table already exists.
* | ||
* @since 3.0.0 | ||
*/ | ||
def buildForV1Write(): CreatableRelationProvider = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add a path for InsertableRelation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insert semantics are weird. It doesn't support the passing in of options as well.
CreatableRelationProvider
is more flexible. I also did a quick spot check of:
https://spark-packages.org/?q=tags%3A%22Data%20Sources%22
All sources that I checked support CreatableRelationProvider
, but some don't support InsertableRelation
@@ -129,7 +137,8 @@ case class AtomicCreateTableAsSelectExec( | |||
} | |||
val stagedTable = catalog.stageCreate( | |||
ident, query.schema, partitioning.toArray, properties.asJava) | |||
writeToStagedTable(stagedTable, writeOptions, ident) | |||
val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same problem here. I think mode should always be Append because that's the only one with reliable behavior that matches v2.
planLater(query), | ||
props, | ||
writeOptions, | ||
orCreate = orCreate) :: Nil | ||
} | ||
|
||
case AppendData(r: DataSourceV2Relation, query, _) => | ||
AppendDataExec(r.table.asWritable, r.options, planLater(query)) :: Nil | ||
AppendDataExec(r.table.asWritable, r.options, query, planLater(query)) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that falling back to v1 should use different plan nodes so that users can see that the v1 write API is used instead of v2. It would also help keep concerns separated in the exec nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's gonna require a separation at the catalog level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked about this separately and I think this is okay for now. In the future, we can add a separate Write
produced by the builder (like Scan
in the read side) and use that to use a separate plan.
also cc @RussellSpitzer (you may be interested) |
options: CaseInsensitiveStringMap): RDD[InternalRow] = { | ||
relation.createRelation( | ||
sqlContext, mode, options.asScala.toMap, Dataset.ofRows(sqlContext.sparkSession, plan)) | ||
sparkContext.emptyRDD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something here, but why does this return an RDD and why is it empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so I see this is used in Atomic Table Writes, that seems a bit wrong to me, shouldn't we just not support the atomic table writes with V1 Fallback? Seems like we are violating the contract by returning empty regardless of what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the contract? All data write commands have returned an empty result since the beginning of time, e.g. look at SaveIntoDataSourceCommand
, V2TableWriteExec
, InsertIntoDataSourceCommand
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val writtenRows = writer match {
case v1: V1WriteBuilder =>
writeWithV1(v1.buildForV1Write(), writeOptions)
case v2 =>
doWrite(v2.buildForBatch())
}
If this is always empty why do we save it as writtenRows here? This is just to hold a reference to the empty result set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. It's pretty much dead code. I think if we decide to change what to return later, it's easier to change 1-2 places vs 'n' different operators.
I'd like to add that support in a separate PR, but I do think it is valuable and we should have it. |
@rdblue Addressed your comments. Can you please take a look when possible? |
Test build #108737 has finished for PR 25348 at commit
|
Test build #108738 has finished for PR 25348 at commit
|
Test build #108746 has finished for PR 25348 at commit
|
override protected def doExecute(): RDD[InternalRow] = { | ||
writeBuilder match { | ||
case builder: SupportsTruncate if isTruncate(deleteWhere) => | ||
writeWithV1(builder.truncate().asV1Writer.buildForV1Write(), writeOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then it's not a simple fallback now. People need to implement SupportsTruncate
and SupportsOverwrite
, and people need to update their CreatableRelationProvider
implementation to apply the deleteWhere
condition while the save mode is always apend.
AFAIK a v1 source can write data with SaveMode, which has inconsistent behavior and we don't want to rely on it. A v1 source can also write data with InsertableRelation.insert
, which can append data or overwrite the entire table. I think those are the only 2 cases where we can safely fallback to v1 source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've thought about it more, and since the only mode of write that we want to support is Appending, I changed the behavior to InsertableRelation
. With regards to:
Then it's not a simple fallback now. People need to implement SupportsTruncate and SupportsOverwrite.
I think this is fine. If a data source already has implemented SaveMode.Overwrite, they already have some logic to delete partitions. Implementing these as a part of the WriteBuilder API shouldn't be too hard, and code re-use is possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative is SupportsDelete
, which could be used for both. We just can't support dynamic overwrite.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
Show resolved
Hide resolved
plan: LogicalPlan) extends V1FallbackWriters { | ||
|
||
override protected def doExecute(): RDD[InternalRow] = { | ||
writeWithV1(writeBuilder.buildForV1Write()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pass the builder in rather than building and passing the BatchWrite
? Is this trying to manage the life-cycle so that the write is only created if it will be executed?
If so, this may be a good reason to have a separate Write
and BatchWrite
like we have for Scan
and BatchScan
.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V1FallbackWriters.scala
Show resolved
Hide resolved
doWrite(batchWrite) | ||
writeBuilder match { | ||
case v1: V1WriteBuilder => writeWithV1(v1.buildForV1Write()) | ||
case v2 => doWrite(v2.buildForBatch()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Should we rename doWrite
to writeWithV2
?
writeOptions: CaseInsensitiveStringMap, | ||
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { | ||
writeBuilder: WriteBuilder, | ||
query: SparkPlan) extends V2TableWriteExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since thinking about the impact to simpleString
, I realized that this is also going to delegate to the write builder for other methods as well, including equals
and hashCode
. Since we can't rely on the behavior of the write builder's equals
and hashCode
methods, I don't think it the builder should be used as an argument to the plan case classes.
I think that also makes sense: case classes are algebraic types, so we shouldn't include objects in their definitions that don't behave like algebraic types.
@@ -259,27 +266,25 @@ case class AppendDataExec( | |||
*/ | |||
case class OverwriteByExpressionExec( | |||
table: SupportsWrite, | |||
writeBuilder: WriteBuilder, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the builder is no longer passed in, then I don't think this class needs to change at all.
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/V1WriteFallbackSuite.scala
Show resolved
Hide resolved
Test build #109430 has finished for PR 25348 at commit
|
Test build #109431 has finished for PR 25348 at commit
|
def buildForV1Write(): InsertableRelation | ||
|
||
// These methods cannot be implemented by a V1WriteBuilder. | ||
override final def buildForBatch(): BatchWrite = super.buildForBatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is required. Now WriteBuilder implementations need to
class ExampleBuilder extends WriteBuilder with V1WriteBuilder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: would be nice to have a comment that the superclass is going to throw an exception.
if (tables.containsKey(fullIdent)) { | ||
tables.get(fullIdent) | ||
} else { | ||
// Table was created through the built-in catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this case a little odd. I think it makes sense to layer on in-memory tables because we need to return the same table instance. But why create an in-memory shadow table for tables that already exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I get it.
import org.apache.spark.sql.sources.v2.Table | ||
import org.apache.spark.sql.types.StructType | ||
|
||
/** A SessionCatalog that always loads an in memory Table, so we can test write code paths. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't always load an in-memory table, since newTable
is abstract. Can you update the docs to be a bit more clear about what this does?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that it makes sense for this to be separate, since the table cache is in memory but tables aren't necessarily in memory.
+1 when tests pass |
Test build #109435 has finished for PR 25348 at commit
|
Test build #109449 has finished for PR 25348 at commit
|
LGTM, merging to master! |
### What changes were proposed in this pull request? Add a `V1Scan` interface, so that data source v1 implementations can migrate to DS v2 much easier. ### Why are the changes needed? It's a lot of work to migrate v1 sources to DS v2. The new API added here can allow v1 sources to go through v2 code paths without implementing all the Batch, Stream, PartitionReaderFactory, ... stuff. We already have a v1 write fallback API after apache#25348 ### Does this PR introduce any user-facing change? no ### How was this patch tested? new test suite Closes apache#26231 from cloud-fan/v1-read-fallback. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR adds a V1 fallback interface for writing to V2 Tables using V1 Writer interfaces. The only supported SaveMode that will be called on the target table will be an Append. The target table must use V2 interfaces such as
SupportsOverwrite
orSupportsTruncate
to support Overwrite operations. It is up to the target DataSource implementation if this operation can be atomic or not.We do not support dynamicPartitionOverwrite, as we cannot call a
commit
method that actually cleans up the data in the partitions that were touched through this fallback.How was this patch tested?
Will add tests and example implementation after comments + feedback. This is a proposal at this point.