Skip to content

Commit

Permalink
wrap with View
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Jan 23, 2021
1 parent 71c01e8 commit 8dc1961
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}
view.copy(child = newChild)
case p @ SubqueryAlias(_, view: View) =>
p.makeCopy(Array(p.identifier, resolveViews(view)))
p.copy(child = resolveViews(view))
case _ => plan
}

Expand All @@ -1099,6 +1099,11 @@ class Analyzer(override val catalogManager: CatalogManager)
}

EliminateSubqueryAliases(relation) match {
case v: View if v.isTempView && v.child.isInstanceOf[LeafNode] =>
// Inserting into a file-based temporary view is allowed.
// (e.g., spark.read.parquet("path").createOrReplaceTempView("t").
// The check on the rdd-based relation is done in PreWriteCheck.
i.copy(table = v.child)
case v: View =>
throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table)
case other => i.copy(table = other)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, ImplicitCastInputTypes}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.SUBQUERY_TYPE_TAG
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
Expand Down Expand Up @@ -855,9 +854,7 @@ class SessionCatalog(
def lookupTempView(table: String): Option[SubqueryAlias] = {
val formattedTable = formatTableName(table)
getTempView(formattedTable).map { view =>
val s = SubqueryAlias(formattedTable, view)
s.setTagValue(SUBQUERY_TYPE_TAG, "tempView")
s
SubqueryAlias(formattedTable, view)
}
}

Expand All @@ -866,9 +863,7 @@ class SessionCatalog(
if (formattedDB == globalTempViewManager.database) {
val formattedTable = formatTableName(table)
getGlobalTempView(formattedTable).map { view =>
val s = SubqueryAlias(formattedTable, formattedDB, view)
s.setTagValue(SUBQUERY_TYPE_TAG, "tempView")
s
SubqueryAlias(formattedTable, formattedDB, view)
}
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -955,8 +954,6 @@ case class SubqueryAlias(
}

object SubqueryAlias {
val SUBQUERY_TYPE_TAG = TreeNodeTag[String]("subQueryType")

def apply(
identifier: String,
child: LogicalPlan): SubqueryAlias = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, PythonUDF, ScalaUDF, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.SUBQUERY_TYPE_TAG
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.execution.aggregate.ScalaUDAF
Expand Down Expand Up @@ -123,18 +122,18 @@ case class CreateViewCommand(
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val catalogTable = prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child)
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child))
TemporaryViewRelation(catalogTable)
} else {
aliasedPlan
View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child)
}
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
Expand All @@ -147,17 +146,17 @@ case class CreateViewCommand(
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val catalogTable = prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child)
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(
prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child))
TemporaryViewRelation(catalogTable)
} else {
aliasedPlan
View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child)
}
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
Expand Down Expand Up @@ -562,8 +561,6 @@ object ViewHelper {
child.collect {
case s @ SubqueryAlias(_, view: View) if view.isTempView =>
Seq(s.identifier.qualifier :+ s.identifier.name)
case s: SubqueryAlias if s.getTagValue(SUBQUERY_TYPE_TAG).exists(_ == "tempView") =>
Seq(s.identifier.qualifier :+ s.identifier.name)
case plan if plan.resolved => plan.expressions.flatMap(_.collect {
case e: SubqueryExpression => collectTempViews(e.plan)
}).flatten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1548,7 +1548,7 @@ class DataFrameSuite extends QueryTest
val e2 = intercept[AnalysisException] {
insertion.write.insertInto("indirect_ds")
}
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
assert(e2.getMessage.contains("Inserting into a view is not allowed. View: `indirect_ds`"))

// error case: insert into an OneRowRelation
Dataset.ofRows(spark, OneRowRelation()).createOrReplaceTempView("one_row")
Expand Down

0 comments on commit 8dc1961

Please sign in to comment.