From 4b3f184ec49b14a6c445ea57d6f562564ef3474d Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 25 Jan 2021 14:02:54 -0800 Subject: [PATCH] trying with View and optional CatalogTable --- .../sql/catalyst/analysis/Analyzer.scala | 36 ++++++++++++---- .../sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../spark/sql/catalyst/analysis/view.scala | 16 ++++++- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../plans/logical/basicLogicalOperators.scala | 6 ++- .../sql/catalyst/analysis/AnalysisSuite.scala | 4 +- .../catalog/SessionCatalogSuite.scala | 10 ++++- .../spark/sql/execution/CacheManager.scala | 10 +++-- .../spark/sql/execution/command/views.scala | 42 ++++++++++--------- .../spark/sql/internal/CatalogImpl.scala | 2 +- 11 files changed, 88 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d97e067425bbb..6971b745adf18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -884,10 +884,14 @@ class Analyzer(override val catalogManager: CatalogManager) case write: V2WriteCommand => write.table match { case UnresolvedRelation(ident, _, false) => - lookupTempView(ident).map(EliminateSubqueryAliases(_)).map { - case r: DataSourceV2Relation => write.withNewTable(r) - case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted) - }.getOrElse(write) + lookupTempView(ident) + .map(EliminateSubqueryAliases(_)) + .map(EliminateDataFrameTempViews(_)) + .map { + case r: DataSourceV2Relation => write.withNewTable(r) + case _ => + throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted) + }.getOrElse(write) case _ => write } case u @ UnresolvedTable(ident, cmd) => @@ -1071,7 +1075,7 @@ class Analyzer(override val catalogManager: CatalogManager) // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. - case view @ View(desc, isTempView, _, child) if !child.resolved => + case view @ View(Some(desc), isTempView, _, child) if !child.resolved => // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc) { val nestedViewDepth = AnalysisContext.get.nestedViewDepth @@ -1085,6 +1089,10 @@ class Analyzer(override val catalogManager: CatalogManager) } } view.copy(child = newChild) + // If view.desc is None, view must be storing a dataframe temp view. + case view @ View(None, isTempView, _, child) => + assert(isTempView && child.resolved) + view case p @ SubqueryAlias(_, view: View) => p.copy(child = resolveViews(view)) case _ => plan @@ -1104,8 +1112,8 @@ class Analyzer(override val catalogManager: CatalogManager) // (e.g., spark.read.parquet("path").createOrReplaceTempView("t"). // The check on the rdd-based relation is done in PreWriteCheck. i.copy(table = v.child) - case v: View => - throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table) + case v: View if v.desc.isDefined => + throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.get.identifier, table) case other => i.copy(table = other) } @@ -1130,8 +1138,9 @@ class Analyzer(override val catalogManager: CatalogManager) lookupRelation(u.multipartIdentifier, u.options, false) .map(EliminateSubqueryAliases(_)) .map { - case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError( - v.desc.identifier, write) + case v: View if v.desc.isDefined => + throw QueryCompilationErrors.writeIntoViewNotAllowedError( + v.desc.get.identifier, write) case u: UnresolvedCatalogRelation => throw QueryCompilationErrors.writeIntoV1TableNotAllowedError( u.tableMeta.identifier, write) @@ -3671,6 +3680,15 @@ object EliminateSubqueryAliases extends Rule[LogicalPlan] { } } +/** + * Removes [[View]] operators from the plan if they are temp views created by DataFrame API. + */ +object EliminateDataFrameTempViews extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case v: View if v.desc.isEmpty => v.child + } +} + /** * Removes [[Union]] operators from the plan if it just has one child. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 8ec9583125fd3..5232de8c4b75d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -426,7 +426,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // output, nor with the query column names, throw an AnalysisException. // If the view's child output can't up cast to the view output, // throw an AnalysisException, too. - case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) => + case v @ View(Some(desc), _, output, child) if child.resolved && !v.sameOutput(child) => val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { if (output.length != queryColumnNames.length) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index dfadf0a539948..f31f34a543c1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -56,7 +56,7 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { // The child has the different output attributes with the View operator. Adds a Project over // the child of the view. - case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) => + case v @ View(Some(desc), _, output, child) if child.resolved && !v.sameOutput(child) => val resolver = conf.resolver val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { @@ -81,9 +81,21 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport { } Project(newOutput, child) + case v @ View(None, _, output, child) if !v.sameOutput(child) => + val queryOutput = child.output + // Map the attributes in the query output to the attributes in the view output by index. + val newOutput = output.zip(queryOutput).map { + case (attr, originAttr) if !attr.semanticEquals(originAttr) => + // `CheckAnalysis` already guarantees that the cast is a up-cast for sure. + Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId, + qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata)) + case (_, originAttr) => originAttr + } + Project(newOutput, child) + // The child should have the same output attributes with the View operator, so we simply // remove the View operator. - case View(_, _, _, child) => + case v @ View(_, _, _, child) => child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0ae9cae8af779..f95150ec37b69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -852,7 +852,7 @@ class SessionCatalog( parser.parsePlan(viewText) } View( - desc = metadata, + desc = Some(metadata), isTempView = isTempView, output = metadata.schema.toAttributes, child = viewPlan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 28dfc0958ab6f..ff38aa066f1c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -143,6 +143,7 @@ abstract class Optimizer(catalogManager: CatalogManager) EliminateResolvedHint, EliminateSubqueryAliases, EliminateView, + EliminateDataFrameTempViews, ReplaceExpressions, RewriteNonCorrelatedExists, ComputeCurrentTime, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 80883e1cd5e43..7d35380b3f915 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -456,10 +456,11 @@ case class InsertIntoDir( * `CatalogTable.viewText`, should throw an error if the `viewText` is not defined. */ case class View( - desc: CatalogTable, + desc: Option[CatalogTable], isTempView: Boolean, output: Seq[Attribute], child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation { + require(desc.isDefined || isTempView) override def producedAttributes: AttributeSet = outputSet @@ -470,7 +471,8 @@ case class View( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) override def simpleString(maxFields: Int): String = { - s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" + val viewIdent = desc.map{ d => s"${d.identifier}, "}.getOrElse("") + s"View ($viewIdent${output.mkString("[", ",", "]")})" } override def doCanonicalize(): LogicalPlan = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index f66871ee75ecc..c626f25e67f26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -660,11 +660,11 @@ class AnalysisSuite extends AnalysisTest with Matchers { val batches = Batch("View", Once, EliminateView) :: Nil } val relation = LocalRelation(Symbol("a").int.notNull, Symbol("b").string) - val view = View(CatalogTable( + val view = View(Some(CatalogTable( identifier = TableIdentifier("v1"), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType)))), + schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType))))), isTempView = false, output = Seq(Symbol("a").int, Symbol("b").string), child = relation) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 98f9ce6fe9dbb..7a53dbb17de3f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -646,7 +646,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { // Look up a view. catalog.setCurrentDatabase("default") - val view = View(desc = metadata, isTempView = false, output = metadata.schema.toAttributes, + val view = View( + desc = Some(metadata), + isTempView = false, + output = metadata.schema.toAttributes, child = CatalystSqlParser.parsePlan(metadata.viewText.get)) comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))), SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, "db3", "view1"), view)) @@ -666,7 +669,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { assert(metadata.viewText.isDefined) assert(metadata.viewCatalogAndNamespace == Seq(CatalogManager.SESSION_CATALOG_NAME, "db2")) - val view = View(desc = metadata, isTempView = false, output = metadata.schema.toAttributes, + val view = View( + desc = Some(metadata), + isTempView = false, + output = metadata.schema.toAttributes, child = CatalystSqlParser.parsePlan(metadata.viewText.get)) comparePlans(catalog.lookupRelation(TableIdentifier("view2", Some("db3"))), SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, "db3", "view2"), view)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 0c6f22dea7b45..bb20c03c9893a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -159,11 +159,15 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { plan: LogicalPlan, cascade: Boolean, blocking: Boolean = false): Unit = { + val qe = spark.sessionState.executePlan(plan) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + val shouldRemove: LogicalPlan => Boolean = if (cascade) { - _.find(_.sameResult(plan)).isDefined + _.find(_.sameResult(analyzedPlan)).isDefined } else { - _.sameResult(plan) + _.sameResult(analyzedPlan) } val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan)) this.synchronized { @@ -187,7 +191,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // will keep it as it is. It means the physical plan has been re-compiled already in the // other thread. val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded - cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded + cd.plan.find(_.sameResult(analyzedPlan)).isDefined && !cacheAlreadyLoaded }) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index b5307b6a82313..9cd840697be70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -122,18 +122,19 @@ case class CreateViewCommand( CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) } val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) - val catalogTable = prepareTemporaryView( - name, - sparkSession, - analyzedPlan, - aliasedPlan.schema, - originalText, - child) // If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { - TemporaryViewRelation(catalogTable) + TemporaryViewRelation( + prepareTemporaryView( + name, + sparkSession, + analyzedPlan, + aliasedPlan.schema, + originalText, + child)) } else { - View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child) + assert(isTemporary) + View(None, isTemporary, aliasedPlan.output, aliasedPlan) } catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (viewType == GlobalTempView) { @@ -146,17 +147,18 @@ case class CreateViewCommand( CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) } val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) - val catalogTable = prepareTemporaryView( - viewIdent, - sparkSession, - analyzedPlan, - aliasedPlan.schema, - originalText, - child) val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { - TemporaryViewRelation(catalogTable) + TemporaryViewRelation( + prepareTemporaryView( + viewIdent, + sparkSession, + analyzedPlan, + aliasedPlan.schema, + originalText, + child)) } else { - View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child) + assert(isTemporary) + View(None, isTemporary, aliasedPlan.output, aliasedPlan) } catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (catalog.tableExists(name)) { @@ -495,8 +497,8 @@ object ViewHelper { path: Seq[TableIdentifier], viewIdent: TableIdentifier): Unit = { plan match { - case v: View => - val ident = v.desc.identifier + case v @ View(Some(desc), _, _, _) => + val ident = desc.identifier val newPath = path :+ ident // If the table identifier equals to the `viewIdent`, current view node is the same with // the altered view. We detect a view reference cycle, should throw an AnalysisException. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index e5f02d8abc263..c72e9fc023d92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -424,7 +424,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { // same way as how a permanent view is handled. This also avoids a potential issue where a // dependent view becomes invalid because of the above while its data is still cached. val viewText = viewDef match { - case v: View => v.desc.viewText + case View(Some(desc), _, _, _) => desc.viewText case _ => None } val plan = sparkSession.sessionState.executePlan(viewDef)