Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30314] Add identifier and catalog information to DataSourceV2Relation #26957

Closed
wants to merge 26 commits into from

Conversation

yuchenhuo
Copy link
Contributor

@yuchenhuo yuchenhuo commented Dec 20, 2019

What changes were proposed in this pull request?

Add identifier and catalog information in DataSourceV2Relation so it would be possible to do richer checks in checkAnalysis step.

Why are the changes needed?

In data source v2, table implementations are all customized so we may not be able to get the resolved identifier from tables them selves. Therefore we encode the table and catalog information in DSV2Relation so no external changes are needed to make sure this information is available.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit tests in the following suites:
CatalogManagerSuite.scala
CatalogV2UtilSuite.scala
SupportsCatalogOptionsSuite.scala
PlanResolutionSuite.scala

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple comments. It would be worthwhile to capture the catalog identifier used, if any

@@ -38,6 +38,8 @@ import org.apache.spark.util.Utils
case class DataSourceV2Relation(
table: Table,
output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want the catalog identifier here too, not the plugin

@@ -784,7 +784,7 @@ class Analyzer(
identifier match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) => Some(DataSourceV2Relation.create(table))
case Some(table) => Some(DataSourceV2Relation.create(table, Some(catalog), Seq(ident)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identifier.headOption will be the identifier for your catalog here

@@ -886,7 +886,7 @@ class Analyzer(
Some(relation)
}
case Some(table) =>
Some(DataSourceV2Relation.create(table))
Some(DataSourceV2Relation.create(table, Some(catalog), Seq(ident)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need newIdent here?
Also catalog will be CatalogManager.SESSION_CATALOG_NAME here

@@ -256,7 +256,7 @@ private[sql] object CatalogV2Util {
}

def loadRelation(catalog: CatalogPlugin, ident: Identifier): Option[NamedRelation] = {
loadTable(catalog, ident).map(DataSourceV2Relation.create)
loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Seq(ident)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need to pass down the catalog identifier down to here. UnresolvedV2Relation contains originalNameParts, which includes the catalog name

// TODO: Pass the PathIdentifiers as the list to V2Relation once that's implemented.
Dataset.ofRows(
sparkSession,
DataSourceV2Relation.create(table, None, Seq.empty, dsOptions))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be a potential identifier and catalog for this after #26913

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then would it make more sense to make the interface take a String instead of Option[String]?

@@ -264,7 +264,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException("Cannot write data to TableProvider implementation " +
"if partition columns are specified.")
}
lazy val relation = DataSourceV2Relation.create(table, dsOptions)
lazy val relation = DataSourceV2Relation.create(table, None, Seq.empty, dsOptions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will be a potential identifier and catalog for this after #26913

@@ -379,7 +379,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case _: V1Table =>
return insertInto(TableIdentifier(ident.name(), ident.namespace().headOption))
case t =>
DataSourceV2Relation.create(t)
DataSourceV2Relation.create(t, Some(catalog), Seq(ident))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on catalog name

@@ -526,7 +526,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))

case (SaveMode.Append, Some(table)) =>
AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan, extraOptions.toMap)
AppendData.byName(
DataSourceV2Relation.create(table, Some(catalog), Seq(ident)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on catalog name

@@ -158,7 +158,9 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
def append(): Unit = {
val append = loadTable(catalog, identifier) match {
case Some(t) =>
AppendData.byName(DataSourceV2Relation.create(t), logicalPlan, options.toMap)
AppendData.byName(
DataSourceV2Relation.create(t, Some(catalog), Seq(identifier)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably need to use a complex extractor above to grab which catalog was used if any

@brkyvz
Copy link
Contributor

brkyvz commented Dec 20, 2019

cc @rdblue We were discussing adding the full identifiers to DataSourceV2Relation for auditing

@rdblue
Copy link
Contributor

rdblue commented Dec 20, 2019

Thanks @brkyvz!

This sounds like a good idea overall. To clarify, the intent is to be able to report the name that was used to load a relation and not to actually use the catalog based on this, right? If that's the case, we can probably make this simpler by keeping the original Seq[String] rather than separating it into Identifier and catalog name. Fewer params for the relation is always a good thing.

@@ -195,7 +195,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case desc @ DescribeNamespace(catalog, namespace, extended) =>
DescribeNamespaceExec(desc.output, catalog, namespace, extended) :: Nil

case desc @ DescribeTable(DataSourceV2Relation(table, _, _), isExtended) =>
case desc @ DescribeTable(DataSourceV2Relation(table, _, _, _, _), isExtended) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cases like this could be translated to DescribeTable(rel: DataSourceV2Relation, isExtended). Then we wouldn't need to change this matcher every time the relation changes. Might be a good idea to rewrite it that way now.

@yuchenhuo
Copy link
Contributor Author

@rdblue yes, the intention is just to make the fully resolved name available in the V2Relation so in the analysis step richer checks could be performed.

The main reason that I went away from just Seq[String] is that the data frame reader seems to support loading from multiple paths at https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L191. In that case, a single Seq[String] would not be able to handle that.

I will be on a vacation in the next two weeks. Will probably need to come back to this later.

case tableProvider: TableProvider =>
val t = tableProvider.getTable(dsOptions)
if (t.supports(BATCH_WRITE)) {
t
(t, None, Seq.empty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion around what the identifier should be here?

case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
case Some(schema) => (provider.getTable(dsOptions, schema), None, Seq.empty)
case _ => (provider.getTable(dsOptions), None, Seq.empty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion around what the identifier should be here?

// Map from catalog back to it's original name for easy name look up, we don't use the
// CatalogPlugin's name as it might be different from the catalog name depending on
// implementation.
private val catalogIdentifiers = mutable.HashMap.empty[CatalogPlugin, String]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle v1SessionCatalog passed by the constructor here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, that catalog doesn't have a name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment here stating that we will have a new instance of a catalog for each catalog name, therefore this reverse map works as intended. We should also add a test for it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the reverse map needed? Can't we just call CatalogPlugin.name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to my understand CatalogPlugin.name depends on the underlying implementation which might not be the actual identifier name for the catalog.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plugin name is intended to prevent needing to do this. While we do rely on the catalog not to report the wrong name, I think it is reasonable to use it. I'm not strongly against this, though. If you think this is cleaner we can do that.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @yuchenhuo Is this still WIP? this is looking good to me

// Map from catalog back to it's original name for easy name look up, we don't use the
// CatalogPlugin's name as it might be different from the catalog name depending on
// implementation.
private val catalogIdentifiers = mutable.HashMap.empty[CatalogPlugin, String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a comment here stating that we will have a new instance of a catalog for each catalog name, therefore this reverse map works as intended. We should also add a test for it

* @param output the output attributes of this relation
* @param catalogIdentifier the string identifier for the catalog
* @param identifiers the identifiers for the v2 relation. For multipath dataframe, there could be
* more than one identifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or none if a V2 relation is instantiated using options

@brkyvz
Copy link
Contributor

brkyvz commented Jan 10, 2020

We also need some tests around the DataFrameReader/Writer and DataFrameWriterV2

case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
case Some(schema) => (provider.getTable(dsOptions, schema), None, Nil)
case _ => (provider.getTable(dsOptions), None, Nil)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no catalog in this case, do we need to care what's the table identifier? I.e. for here, looks like the paths are the identifiers? @brkyvz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be in the future, not yet though. This is fine for now

@@ -32,12 +32,19 @@ import org.apache.spark.util.Utils
* A logical plan representing a data source v2 table.
*
* @param table The table that this relation represents.
* @param output the output attributes of this relation
* @param catalogIdentifier the string identifier for the catalog. None if no catalog is specified
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a correct description?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@yuchenhuo yuchenhuo changed the title [WIP][SPARK-30314] Add identifier and catalog information to DataSourceV2Relation [SPARK-30314] Add identifier and catalog information to DataSourceV2Relation Jan 13, 2020
@yuchenhuo yuchenhuo requested a review from rdblue January 13, 2020 18:01
@zsxwing
Copy link
Member

zsxwing commented Jan 13, 2020

ok to test

@yuchenhuo
Copy link
Contributor Author

@brkyvz @rdblue Thanks for the previous feedbacks! I've added tests for DataFrame reader and writer. Please review again. Thanks in advance!

The only part I didn't test is def load(paths: String*) of DataFrameReader. I'm not so sure how to test that part.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116672 has finished for PR 26957 at commit c80a155.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2020

Test build #116673 has finished for PR 26957 at commit 42bf872.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. One question I'm wondering is whether we should also keep track of the catalog even if the user didn't provided it in their query. We're filling in the Session catalog, even if it wasn't explicitly referenced by spark_catalog. I guess that should be fine

// CatalogPlugin's name as it might be different from the catalog name depending on
// implementation. Catalog name <-> CatalogPlugin instance is a 1:1 mapping.
private val catalogIdentifiers = mutable.HashMap.empty[CatalogPlugin, String]
catalogIdentifiers(defaultSessionCatalog) = SESSION_CATALOG_NAME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should do this as part of loadV2SessionCatalog?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the concern here is that if a user set a custom session catalog implementation and spark fail to load it, it would fall back to use defaultSessionCatalog. https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala#L88. So we need to set the defaultSessionCatalog in the reverse map here

.map(rel => desc.copy(table = rel))
.getOrElse(desc)
CatalogV2Util
.loadRelation(u.catalog, catalogManager.catalogIdentifier(u.catalog), u.tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easier to have a separate util method for UnresolvedV2Relation? The interesting thing about UnresolvedV2Relation is that you have access to the originalNameParts. If that doesn't contain the catalog identifier, then we shouldn't need to add it to the DataSourceV2Relation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do the refactoring.

However in terms of using originalNameParts, I think the major reason why we are adding the two additional fields to DSV2 relation is that both CatalogPlugin and Table implementation doesn't include the resolved unique identifier for the table. Even though the originalNameParts don't have a catalog name in it, it might point to some default catalog in which case we would still want it to be in the resolved unique identifier. Otherwise, we might have two V2Relations pointing to the same table but has different identifiers which would be inconsistent and pretty confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to reproduce the full identifier that was actually loaded by adding in catalog and namespace if they are missing. But I'm not sure it is a good idea to rely on this behavior since there is no guarantee.

@@ -32,12 +32,19 @@ import org.apache.spark.util.Utils
* A logical plan representing a data source v2 table.
*
* @param table The table that this relation represents.
* @param output the output attributes of this relation
* @param catalogIdentifier the string identifier for the catalog. None if no catalog is specified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

case Some(schema) => provider.getTable(dsOptions, schema)
case _ => provider.getTable(dsOptions)
case Some(schema) => (provider.getTable(dsOptions, schema), None, Nil)
case _ => (provider.getTable(dsOptions), None, Nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They will be in the future, not yet though. This is fine for now

AppendData.byName(DataSourceV2Relation.create(table), df.logicalPlan, extraOptions.toMap)
val v2Relation =
DataSourceV2Relation.create(table, catalogManager.catalogIdentifier(catalog), Seq(ident))
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)

case (SaveMode.Overwrite, _) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It drops and recreates a table. It's a DDL operation instead of DML

case NonSessionCatalogAndIdentifier(catalog, identifier) =>
(catalog.asTableCatalog, tableName.headOption, identifier)
case SessionCatalogAndIdentifier(catalog, identifier) =>
(catalog.asTableCatalog, Some(CatalogManager.SESSION_CATALOG_NAME), identifier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they didn't provide the catalog, then I think we can leave this as empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the reason above. I think it's better to always encode the resolved catalog and table identifiers to avoid inconsistency.


private def save(name: String, mode: SaveMode, catalogOpt: Option[String]): Unit = {
val df = spark.range(10)
df.write.format(format).option("name", name).option("catalog", catalogName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catalogOpt.foreach(cName => dfr.option("catalog", cName)) you're accidentally using catalogName here

* @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
* and [[WriteBuilder]].
*/
case class DataSourceV2Relation(
table: Table,
output: Seq[AttributeReference],
catalogIdentifier: Option[String],
identifiers: Seq[Identifier],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to have multiple identifiers here. DSv2 doesn't yet cover how file-based tables should work and I think we need a design document for them. Adding multiple identifiers here in support of something that has undefined behavior seems premature.

Design and behavior of path-based identifiers aside, a table should use one and only one identifier. When path-based tables are supported, I expect them to use a single Identifier with possibly more than one path embedded in it, like we do with the paths key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. If the specification is that there should be one and only one identifier, shall I just define it as identifier: Identifier instead of identifier: Option[Identifier]? The tricky part is still in load(paths: String*), I might need to use some placeholder or null if we choose to not use Option. What do you guys think? cc @brkyvz

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. There should be only one identifier for a table because it is what identified the table
  2. Let's use null for load(paths: String*) because path-based tables are not designed or supported in v2

DataSourceV2Relation.create(table)
DataSourceV2Relation.create(
table,
catalogManager.catalogIdentifier(catalog),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using "identifier" for the catalog name? Everywhere else we call it the catalog name, so I don't see a reason to make this more complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I was thinking about making the naming convention consistent since it's called identifier for tables. I'm totally okay with CatalogName.

@rdblue
Copy link
Contributor

rdblue commented Jan 15, 2020

@brkyvz, @yuchenhuo, is there a reason to use separate catalog and table identifiers? I think it would be better to use a single multi-part identifier that is whatever was passed in, a Seq[String]. That avoids leaking the v2 API's Identifier class into the Analyzer code, where we instead use Seq[String] everywhere else.

@yuchenhuo
Copy link
Contributor Author

@rdblue There are two main reasons why I choose to split the catalog and table identifiers:

  1. If we are using a multi-part string, then we kind of need to have an implicit protocol that the leading string in the Seq is the catalog name and the rest is name spaces and table name. This would become extra tricky when we don't have one in certain cases.
  2. Again for the load(paths: String*) case, multi-part identifier Seq[String] is just not capable to represent the case.

Is there any particular reason why we don't want to import Identifier class in Analyzer code? I'm just feeling that enforce more explicit typing might be good for future extensibility, but I do agree that this increases the complexity and probably encodes duplicated information. I'm pretty new to this code so feel free to suggest the better way to do this.

@rdblue
Copy link
Contributor

rdblue commented Jan 15, 2020

@yuchenhuo, the mapping from multi-part identifier to table is already well-defined. I think using the same code for this is better.

We don't want to use the Identifier class because it is intended for the interface from Spark to tables, not for the Spark analyzer. The Spark analyzer works with Seq[String] and should continue to do so. Identifier is an interface, which means that Spark doesn't necessarily control the implementation. We have to be careful about interfaces like this because we can't necessarily rely on assumptions that we make in the analyzer, like the behavior of equals and hashCode.

@yuchenhuo
Copy link
Contributor Author

@rdblue @brkyvz Based on the discussion we have here, I think I understand a bit more about this.

So the key problem we are trying to solve is that unlike many other V2Commands https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L152 DataSourceV2Relation only includes the table information which doesn't contain the catalog and namespaces info.

Previously, I think I worried too much about whether CatalogPlugin or Table would implement name correctly. However, it seems like anyway based on the current design, the resolved path is generated through some customized parser e.g. TableProvider. So we have no choice but to use those kind of information.

I think here are two final candidate solutions here:

  1. add just Seq[String] to DataSourceV2Relation, so that we would not need to depend too much on Identifier and CatalogPlugin implementation. In terms of the implementation, we will be extracting the table name, namespaces and catalog name from Identifier and CatalogPlugin and put them into a Seq.
  2. add both Identifier and CatalogPlugin to DataSourceV2Relation just in the same way as many other V2Commands. e.g. CreateV2Table, AlterTable.https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L152

I'm not sure if I understand it correctly, it seems like the Analyzer's job is to resolve the unresolved path Strings to actual table so I feels that it makes sense for the Analyzer to passed the resolved Identifier and CatalogPlugin to the following steps as anyway the Analyzer has to do the resolution. Therefore, I think the second approach seems better and it's also consistent with the other V2 interfaces?

@rdblue
Copy link
Contributor

rdblue commented Jan 22, 2020

@yuchenhuo, you're right that the create and replace plans already use Identifier and CatalogPlugin. I'm okay with either solution since Identifier is already used in the analyzer.

Also, I wouldn't worry about the TableProvider path very much. That's a degenerate case that wasn't (and isn't) well defined. Most people will use tables with identifiers. Sometimes those identifiers will need to be extracted from options, but identifiers are the normal case.

@SparkQA
Copy link

SparkQA commented Jan 23, 2020

Test build #117296 has finished for PR 26957 at commit ee1a612.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except two comments. We should probably make catalog and identifier options. There is one typo in the test


private def save(name: String, mode: SaveMode, catalogOpt: Option[String]): Unit = {
val df = spark.range(10).write.format(format).option("name", name)
catalogOpt.foreach(cName => df.option("catalog", catalogName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df.option("catalog", cName))

* @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
* and [[WriteBuilder]].
*/
case class DataSourceV2Relation(
table: Table,
output: Seq[AttributeReference],
catalog: CatalogPlugin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you annotate these with @Nullable?

* @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
* and [[WriteBuilder]].
*/
case class DataSourceV2Relation(
table: Table,
output: Seq[AttributeReference],
catalog: CatalogPlugin,
identifier: Identifier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or should we leave them as Options?

val output = table.schema().toAttributes
DataSourceV2Relation(table, output, options)
DataSourceV2Relation(table, output, catalog, identifiers, options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can wrap with Option here

@SparkQA
Copy link

SparkQA commented Jan 24, 2020

Test build #117338 has finished for PR 26957 at commit 600d289.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

case InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) =>
assert(r.catalog.exists(_ == catlogIdent))
assert(r.identifier.exists(_.name() == tableIdent))
case DescribeRelation(r: ResolvedTable, _, _) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz by looking at @cloud-fan's recent change on this, I'm a little uncertain whether we should be passing the table identifier with the DataSourceV2Relation. It feels like the Analyzer should resolve DataSourceV2Relation to ResolvedTable if table information should be included?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's still under discussion. ResolvedTable is only proposed for DDL commands IIUC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

@SparkQA
Copy link

SparkQA commented Jan 24, 2020

Test build #117365 has finished for PR 26957 at commit df29683.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yuchenhuo yuchenhuo requested a review from rdblue January 24, 2020 23:42
Copy link
Contributor

@brkyvz brkyvz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merging to master. Thanks @yuchenhuo

@asfgit asfgit closed this in d0800fc Jan 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants