-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider #26913
Conversation
Test build #115411 has finished for PR 26913 at commit
|
Test build #115469 has finished for PR 26913 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Outdated
Show resolved
Hide resolved
Mostly looks good. The only real blocker is using |
Test build #115595 has finished for PR 26913 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Outdated
Show resolved
Hide resolved
Test build #115631 has finished for PR 26913 at commit
|
Test build #115635 has finished for PR 26913 at commit
|
Test build #115637 has finished for PR 26913 at commit
|
It's good to see that we are looking at the big picture, to support user-specified schema, catalog in options, and schema/partition in builtin metastore. For "1 TableProvider", agree with the expectation here, and I'd say For "2 SupportsExternalMetadata", I think itself can be used as a marker and we don't need
For data source, I don't think they care about where the schema/partitioning come from. It can be inferred by themselves, or specified by end-users, or from Spark's builtin catalog. In For "3 SupportsCatalogOptions", sounds good to me, but looks better if we can fail with user-specified schema not supported like |
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Outdated
Show resolved
Hide resolved
assert(table.name() === s"$namespace.t1", "Table identifier was wrong") | ||
assert(table.partitioning().length === partitionBy.length, "Partitioning did not match") | ||
assert(table.partitioning().map(_.references().head.fieldNames().head) === partitionBy, | ||
"Partitioning was incorrect") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These assertions are probably easier if you use the extractors:
table.partitioning.head match {
case IdentityTransform(FieldReference(field)) =>
assert(field === Seq(partitionBy.head))
case _ =>
fail(...)
}
I noted a few things, but overall +1. |
Retest this please. |
One more thing: can we keep the discussion about the I think that's a good place to continue. I think this PR is independent. |
Test build #115656 has finished for PR 26913 at commit
|
retest this please |
Test build #115671 has finished for PR 26913 at commit
|
retest this please |
Test build #115750 has finished for PR 26913 at commit
|
@cloud-fan Any more comments on this? Shall we merge this? |
retest this please |
* topic name, etc. It's an immutable case-insensitive string-to-string map. | ||
*/ | ||
default String extractCatalog(CaseInsensitiveStringMap options) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we by default return CatalogManager.SESSION_CATALOG_NAME
instead of null?
case Some(schema) => provider.getTable(dsOptions, schema) | ||
case _ => provider.getTable(dsOptions) | ||
val table = provider match { | ||
case hasCatalog: SupportsCatalogOptions => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's fail if the user specifies schema.
val partitioning = partitioningColumns.map { colNames => | ||
colNames.map(name => IdentityTransform(FieldReference(name))) | ||
}.getOrElse(Seq.empty[Transform]) | ||
val bucketing = bucketColumnNames.map { cols => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we call CatalogV2Implicits.BucketSpecHelper.asTransform
?
LGTM except 3 comments |
Test build #116397 has finished for PR 26913 at commit
|
Thanks @rdblue and @cloud-fan . Merging to master |
Test build #116402 has finished for PR 26913 at commit
|
What changes were proposed in this pull request?
This PR introduces
SupportsCatalogOptions
as an interface forTableProvider
. ThroughSupportsCatalogOptions
, V2 DataSources can implement the two methodsextractIdentifier
andextractCatalog
to support the creation, and existence check of tables without requiring a formal TableCatalog implementation.We currently don't support all SaveModes for DataSourceV2 in DataFrameWriter.save. The idea here is that eventually File based tables can be written with
DataFrameWriter.save(path)
will create a PathIdentifier where the name ispath
, and the V2SessionCatalog will be able to perform FileSystem checks atpath
to support ErrorIfExists and Ignore SaveModes.Why are the changes needed?
To support all Save modes for V2 data sources with DataFrameWriter. Since we can now support table creation, we will be able to provide partitioning information when first creating the table as well.
Does this PR introduce any user-facing change?
Introduces a new interface
How was this patch tested?
Will add tests once interface is vetted.