From 8dc19613500b6301d7c666680fbb153ca41fb04b Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 22 Jan 2021 21:11:03 -0800 Subject: [PATCH] wrap with View --- .../sql/catalyst/analysis/Analyzer.scala | 7 +++- .../sql/catalyst/catalog/SessionCatalog.scala | 9 +---- .../plans/logical/basicLogicalOperators.scala | 3 -- .../spark/sql/execution/command/views.scala | 39 +++++++++---------- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- 5 files changed, 27 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0e76ffda52a66..4f409756fdd18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1086,7 +1086,7 @@ class Analyzer(override val catalogManager: CatalogManager) } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => - p.makeCopy(Array(p.identifier, resolveViews(view))) + p.copy(child = resolveViews(view)) case _ => plan } @@ -1099,6 +1099,11 @@ class Analyzer(override val catalogManager: CatalogManager) } EliminateSubqueryAliases(relation) match { + case v: View if v.isTempView && v.child.isInstanceOf[LeafNode] => + // Inserting into a file-based temporary view is allowed. + // (e.g., spark.read.parquet("path").createOrReplaceTempView("t"). + // The check on the rdd-based relation is done in PreWriteCheck. + i.copy(table = v.child) case v: View => throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table) case other => i.copy(table = other) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 253becb6749c1..7054588427e70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, ImplicitCastInputTypes} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} -import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.SUBQUERY_TYPE_TAG import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.errors.QueryCompilationErrors @@ -855,9 +854,7 @@ class SessionCatalog( def lookupTempView(table: String): Option[SubqueryAlias] = { val formattedTable = formatTableName(table) getTempView(formattedTable).map { view => - val s = SubqueryAlias(formattedTable, view) - s.setTagValue(SUBQUERY_TYPE_TAG, "tempView") - s + SubqueryAlias(formattedTable, view) } } @@ -866,9 +863,7 @@ class SessionCatalog( if (formattedDB == globalTempViewManager.database) { val formattedTable = formatTableName(table) getGlobalTempView(formattedTable).map { view => - val s = SubqueryAlias(formattedTable, formattedDB, view) - s.setTagValue(SUBQUERY_TYPE_TAG, "tempView") - s + SubqueryAlias(formattedTable, formattedDB, view) } } else { None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 88ed55957dea9..80883e1cd5e43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} -import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -955,8 +954,6 @@ case class SubqueryAlias( } object SubqueryAlias { - val SUBQUERY_TYPE_TAG = TreeNodeTag[String]("subQueryType") - def apply( identifier: String, child: LogicalPlan): SubqueryAlias = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 6d88ee1a87814..2eb2262b52a95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, PythonUDF, ScalaUDF, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias, View} -import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.SUBQUERY_TYPE_TAG import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import org.apache.spark.sql.execution.aggregate.ScalaUDAF @@ -123,18 +122,18 @@ case class CreateViewCommand( CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) } val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) + val catalogTable = prepareTemporaryView( + name, + sparkSession, + analyzedPlan, + aliasedPlan.schema, + originalText, + child) // If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { - TemporaryViewRelation( - prepareTemporaryView( - name, - sparkSession, - analyzedPlan, - aliasedPlan.schema, - originalText, - child)) + TemporaryViewRelation(catalogTable) } else { - aliasedPlan + View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child) } catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (viewType == GlobalTempView) { @@ -147,17 +146,17 @@ case class CreateViewCommand( CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString) } val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) + val catalogTable = prepareTemporaryView( + viewIdent, + sparkSession, + analyzedPlan, + aliasedPlan.schema, + originalText, + child) val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) { - TemporaryViewRelation( - prepareTemporaryView( - viewIdent, - sparkSession, - analyzedPlan, - aliasedPlan.schema, - originalText, - child)) + TemporaryViewRelation(catalogTable) } else { - aliasedPlan + View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child) } catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace) } else if (catalog.tableExists(name)) { @@ -562,8 +561,6 @@ object ViewHelper { child.collect { case s @ SubqueryAlias(_, view: View) if view.isTempView => Seq(s.identifier.qualifier :+ s.identifier.name) - case s: SubqueryAlias if s.getTagValue(SUBQUERY_TYPE_TAG).exists(_ == "tempView") => - Seq(s.identifier.qualifier :+ s.identifier.name) case plan if plan.resolved => plan.expressions.flatMap(_.collect { case e: SubqueryExpression => collectTempViews(e.plan) }).flatten diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f36038222193b..7ee58f6824345 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1548,7 +1548,7 @@ class DataFrameSuite extends QueryTest val e2 = intercept[AnalysisException] { insertion.write.insertInto("indirect_ds") } - assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + assert(e2.getMessage.contains("Inserting into a view is not allowed. View: `indirect_ds`")) // error case: insert into an OneRowRelation Dataset.ofRows(spark, OneRowRelation()).createOrReplaceTempView("one_row")