Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29665][SQL] Refine the TableProvider Interface #26868

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Dec 12, 2019

What changes were proposed in this pull request?

Instead of having several overloads of getTable method in TableProvider, it's better to have 2 methods explicitly: inferSchema and inferPartitioning. With a single getTable method that takes everything: schema, partitioning and properties.

This PR also adds a supportsExternalMetadata method in TableProvider, to indicate if the source support external table metadata. If this flag is false:

  1. spark.read.schema... is disallowed and fails
  2. when we support creating v2 tables in session catalog, spark only keeps table properties in the catalog.

Why are the changes needed?

API improvement.

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

@cloud-fan cloud-fan force-pushed the provider2 branch 2 times, most recently from e7f1884 to 5aecc51 Compare December 12, 2019 15:51
@SparkQA
Copy link

SparkQA commented Dec 12, 2019

Test build #115244 has finished for PR 26868 at commit 5aecc51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])

@cloud-fan
Copy link
Contributor Author

cc @rdblue @brkyvz

@rdblue
Copy link
Contributor

rdblue commented Dec 19, 2019

Overall, I'm -1 on this PR.

This confuses a few things. For example, this confuses user-specified schema with the SupportsExternalMetadata case. Support for user-specified schema does require the ability to pass the schema in, but the methods added by SupportsExternalMetadata are actually required when using a "generic" metastore to pass in the metastore's schema and partitioning. (As a separate discussion, I think that user-specified schema should be an explicit trait to signal support.)

This also confuses the use case where "a datasource has metastore". If a source should go to a metastore, then we want to encourage the use of SupportsCatalogOptions (#26913) to instantiate the table correctly using a Catalog. That approach fixes the currently unsupported SaveModes as well. TableProvider.getTable would not be helpful to this case.

The main use case for the TableProvider API is to support tables that do not use a catalog (via SupportsCatalogOptions). Clearly, we need SupportsExternalMetadata.getTable for the case where a generic metastore needs to create a table. The question is whether we also need TableProvider.getTable(properties).

So far, the only argument I've heard in favor of having both getTable methods is that it is easier to port the file sources. I don't think it is worth adding an extra interface that must be implemented in order to use the built-in Spark catalog just to avoid some trouble with file source implementations. This makes the API more difficult to understand and I think people will be surprised by not being able to use the built-in generic catalog when they've implemented TableProvider.

@cloud-fan
Copy link
Contributor Author

@rdblue I think there is some misunderstanding here. I did this change to make the API simpler for non-file-source.

Let's walk through some common data sources:

  1. simple sources that report fixed schemas, like kafka source. For them, a simple getTable(properties) is the best.
  2. sources that can infer schema or accept user-specified schema, like file source. For them, I agree with you that having explicit infer methods is better. So this PR introduces SupportsExternalMetadata with infer methods and getTable(schema, partitioning, properties).
  3. sources that have metastore, like JDBC source. For them, I also agree with you that SupportsCatalogOptions ([SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider #26913) is a good solution.

If we have a source that implements SupportsCatalogOptions, it will be confusing if we have to implement the infer methods as well. With this PR, SupportsExternalMetadata is a mixin so we don't have to implement the infer methods.

@rdblue
Copy link
Contributor

rdblue commented Dec 20, 2019

@cloud-fan, what is your rationale for saying "For [sources like Kafka], a simple getTable(properties) is the best."? You didn't give any argument why that is the case.

My understanding is that the existing Kafka source has a static schema, so inferSchema is easy to implement. For other Kafka implementations, people may use a schema store in which case the lookup is fairly easy (although we would encourage building a Kafka catalog in this case). I don't see how the two inference methods make it more difficult to implement in this case.

Having two getTable calls does make the API more confusing. If I want to store a Kafka stream in the built-in generic catalog, we agree that catalog should pass the schema and partitioning to TableProvider.getTable (Your point 2.). That means that both getTable(properties) and getTable(schema, partitioning, properties) must be implemented. And if an author doesn't implement the optional SupportsExternalMetadata interface the source won't work with a generic metastore. I think that's a significant problem and is surprising behavior.

Like I said, we clearly need getTable(schema, partitioning, properties). I don't think that we actually need a second variant, especially when implementing only the "simple" version doesn't work with the built-in metastore.

@cloud-fan
Copy link
Contributor Author

what is your rationale for saying "For [sources like Kafka], a simple getTable(properties) is the best."?

We can tell it by looking at the code, but let me explain it here as well.

class KafkaProvider implements TableProvider {
  Table getTable(properties) {
    return new KafkaTable(properties)
  }
}

class KafkaTable implements TableProvider {
  StructType schema() {
    return the_fixed_schema;
  }

  Transform[] partitioning() {
    return new Transform[0];
  }

  ScanBuilder ...
  WriteBuilder ...
}

This is simpler than the below one, as we don't need to worry about if the passed in schema and partitioning are wrong.

class KafkaProvider implements TableProvider {
  StructType inferSchema() {
    return the_fixed_schema;
  }

  Transform[] inferPartitioning() {
    return new Transform[0];
  }

  Table getTable(schema, partitioninng, properties) {
    assert(schema == the_fixed_schema)
    assert(partitioninng.isEmpty)
    return new KafkaTable(schema, properties)
  }
}

class KafkaTable(schema) implements TableProvider {
  StructType schema() {
    return this.schema;
  }

  Transform[] partitioning() {
    return new Transform[0];
  }

  ScanBuilder ...
  WriteBuilder ...
}

If I want to store a Kafka stream in the built-in generic catalog, we agree that catalog should pass the schema and partitioning to TableProvider.getTable (Your point 2.). That means that both getTable(properties) and getTable(schema, partitioning, properties) must be implemented.

In the last sync, I think we agree that we should have a "flag" to let Spark not store the schema/partitioning in the built-in generic catalog. SupportsExternalMetadata is the flag. If a source don't implement SupportsExternalMetadata, then Spark won't store the schema/partitioning in the builtin catalog. When we scan this table, Spark just call getTable(properties) and ask the source to report schema/partitioning.

@cloud-fan
Copy link
Contributor Author

I've added a note about how TableProvider should work with the builtin generic catalog in the Why are the changes needed? section.

@rdblue
Copy link
Contributor

rdblue commented Dec 23, 2019

@cloud-fan, thanks for bringing up a flag to not store schema and partitioning in the catalog. I don't recall that discussion, but maybe I misinterpreted what was said. I had thought that not implementing SupportsExternalMetadata would prevent a generic catalog from tracking the table, but you're right that it could just ignore the schema and partitioning in the catalog. But even with that cleared up, I don't think having two getTable methods is a good idea.

It would be surprising that the simplest source implementation can be stored in the built-in catalog, but defaults to opting out of tracking schema and partitioning with that catalog. Having schema and partitioning tracked by the catalog is a major reason to use the built-in catalog. Implementations with a different source of truth will plug in a catalog, so the main benefit of using the generic catalog is to track a source's metadata. I think that implementing SupportsExternalMetadata would be far more common than not, so it should be the default. (There's even reason for a Kafka source to use this for partitioning or a record schema; it's just not what the built-in Kafka does.)

Thanks for posting the pseudo-code for Kafka. I see what you're saying, but the second option is not so arduous that we must provide a simplification. And we also have to consider a similar situation for a source that does implement SupportsExternalMetadata. Should Spark provide a default getTable(properties) that throws an exception? Or do sources implement a getTable method that probably can't return a table if it is called because the schema is missing? Having two methods is worse for the more common case.

In short, I think the more common case is sources that accept the schema from the metastore. It will be simpler to understand and use the API if we don't have two getTable methods. And it isn't unreasonable for sources that don't use an external schema to either check the one passed in or ignore it -- after all, Spark will guarantee that the schema is either the one from inferSchema or the one from the metastore (or user-specified if that's supported).

*/
Table getTable(CaseInsensitiveStringMap options);
Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties);
Copy link
Contributor Author

@cloud-fan cloud-fan Dec 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use CaseInsensitiveStringMap as table properties here? People can get the original case sensitive map easily via asCaseSensitiveMap.

cc @rdblue @brkyvz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be case preserving, because it'll come from the metastore right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's keep it as it is.

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115744 has finished for PR 26868 at commit f08e40b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])
  • trait SimpleTableProvider extends TableProvider
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

@SparkQA
Copy link

SparkQA commented Dec 24, 2019

Test build #115745 has finished for PR 26868 at commit c0481a2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])
  • trait SimpleTableProvider extends TableProvider
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

* limitations under the License.
*/

package org.apache.spark.sql.internal.connector
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.spark.sql.internal is a private package (as defined in project/SparkBuild.scala#Unidoc#ignoreUndocumentedPackages).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pick this instead of org.apache.spark.sql.execution.datasources.v2 because that package is only in sql/core and it's weird to see an execution package in catalyst.

@@ -173,15 +173,13 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case _ => None
}
ds match {
case provider: TableProvider =>
// file source v2 does not support streaming yet.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file source v2 is never supported in streaming, as FileTable.capabilities doesn't include streaming ones.

The reason I check it earlier is: now we call TableProvider.inferSchema before checking table capabilities, and the error becomes different if path is not specified.

It's weird that file source reports different error between batch and streaming when path is not specified. We should unify it. Here I just don't want to be blocked by an existing problem. cc @gengliangwang

@SparkQA
Copy link

SparkQA commented Dec 27, 2019

Test build #115848 has finished for PR 26868 at commit 4cc3742.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])
  • trait SimpleTableProvider extends TableProvider
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

@SparkQA
Copy link

SparkQA commented Dec 27, 2019

Test build #115852 has finished for PR 26868 at commit 59b0de1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])
  • trait SimpleTableProvider extends TableProvider
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

@SparkQA
Copy link

SparkQA commented Jan 13, 2020

Test build #116653 has finished for PR 26868 at commit 86963e9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])
  • trait SimpleTableProvider extends TableProvider
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116674 has finished for PR 26868 at commit ffb0b29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • implicit class PartitionTypeHelper(colNames: Seq[String])
  • trait SimpleTableProvider extends TableProvider
  • class NoopDataSource extends SimpleTableProvider with DataSourceRegister
  • class RateStreamProvider extends SimpleTableProvider with DataSourceRegister
  • class TextSocketSourceProvider extends SimpleTableProvider with DataSourceRegister with Logging

schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
assert(partitioning.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an assertion? Wouldn't this be the API to create a table in DataFrameWriter with partitioning?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, this isn't used for FileBased tables

if (provider.isInstanceOf[FileDataSourceV2]) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val partitioning = partitioningColumns.getOrElse(Nil).asTransforms
provider.getTable(df.schema, partitioning, dsOptions.asCaseSensitiveMap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.schema.asNullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

userSpecifiedSchema: Option[StructType]): Table = {
userSpecifiedSchema match {
case Some(schema) =>
if (provider.supportsExternalMetadata()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name this supportsUserSpecifiedSchema if this is only going to throw an error for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for future-proof. The schema may not only come from user-specified, but also Spark catalog (the CREATE TABLE USING case).

@SparkQA
Copy link

SparkQA commented Jan 17, 2020

Test build #116949 has finished for PR 26868 at commit 518f35a.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2020

Test build #117192 has finished for PR 26868 at commit 3f1de1f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. We should do some cleanup later, but lets get this breaking interface in


def getTable(options: CaseInsensitiveStringMap): Table

private[this] var loadedTable: Table = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this cause issues if you load two different Kafka tables? Shouldn't this be an options to table map? I'd probably turn this into a guava cache with an expiration so that you don't leak anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, TableProviders have to be classes, not singleton objects. This should be fine

// following reads would fail.
if (provider.isInstanceOf[FileDataSourceV2]) {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val partitioning = partitioningColumns.getOrElse(Nil).asTransforms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use partitioningAsV2 here instead

@SparkQA
Copy link

SparkQA commented Jan 30, 2020

Test build #117566 has finished for PR 26868 at commit ec1d682.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2020

Test build #117567 has finished for PR 26868 at commit 2ceb46e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in 9f42be2 Jan 31, 2020
@gatorsmile gatorsmile changed the title [SPARK-29665][SQL] refine the TableProvider interface [SPARK-29665][SQL] Refine the TableProvider Interface Feb 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants