Skip to content

Commit

Permalink
trying with View and optional CatalogTable
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Jan 25, 2021
1 parent 7becf96 commit 4b3f184
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -884,10 +884,14 @@ class Analyzer(override val catalogManager: CatalogManager)
case write: V2WriteCommand =>
write.table match {
case UnresolvedRelation(ident, _, false) =>
lookupTempView(ident).map(EliminateSubqueryAliases(_)).map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ => throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
}.getOrElse(write)
lookupTempView(ident)
.map(EliminateSubqueryAliases(_))
.map(EliminateDataFrameTempViews(_))
.map {
case r: DataSourceV2Relation => write.withNewTable(r)
case _ =>
throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(ident.quoted)
}.getOrElse(write)
case _ => write
}
case u @ UnresolvedTable(ident, cmd) =>
Expand Down Expand Up @@ -1071,7 +1075,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
case view @ View(desc, isTempView, _, child) if !child.resolved =>
case view @ View(Some(desc), isTempView, _, child) if !child.resolved =>
// Resolve all the UnresolvedRelations and Views in the child.
val newChild = AnalysisContext.withAnalysisContext(desc) {
val nestedViewDepth = AnalysisContext.get.nestedViewDepth
Expand All @@ -1085,6 +1089,10 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
view.copy(child = newChild)
// If view.desc is None, view must be storing a dataframe temp view.
case view @ View(None, isTempView, _, child) =>
assert(isTempView && child.resolved)
view
case p @ SubqueryAlias(_, view: View) =>
p.copy(child = resolveViews(view))
case _ => plan
Expand All @@ -1104,8 +1112,8 @@ class Analyzer(override val catalogManager: CatalogManager)
// (e.g., spark.read.parquet("path").createOrReplaceTempView("t").
// The check on the rdd-based relation is done in PreWriteCheck.
i.copy(table = v.child)
case v: View =>
throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table)
case v: View if v.desc.isDefined =>
throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.get.identifier, table)
case other => i.copy(table = other)
}

Expand All @@ -1130,8 +1138,9 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupRelation(u.multipartIdentifier, u.options, false)
.map(EliminateSubqueryAliases(_))
.map {
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.identifier, write)
case v: View if v.desc.isDefined =>
throw QueryCompilationErrors.writeIntoViewNotAllowedError(
v.desc.get.identifier, write)
case u: UnresolvedCatalogRelation =>
throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
u.tableMeta.identifier, write)
Expand Down Expand Up @@ -3671,6 +3680,15 @@ object EliminateSubqueryAliases extends Rule[LogicalPlan] {
}
}

/**
* Removes [[View]] operators from the plan if they are temp views created by DataFrame API.
*/
object EliminateDataFrameTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case v: View if v.desc.isEmpty => v.child
}
}

/**
* Removes [[Union]] operators from the plan if it just has one child.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// output, nor with the query column names, throw an AnalysisException.
// If the view's child output can't up cast to the view output,
// throw an AnalysisException, too.
case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) =>
case v @ View(Some(desc), _, output, child) if child.resolved && !v.sameOutput(child) =>
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
if (output.length != queryColumnNames.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) =>
case v @ View(Some(desc), _, output, child) if child.resolved && !v.sameOutput(child) =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
Expand All @@ -81,9 +81,21 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport {
}
Project(newOutput, child)

case v @ View(None, _, output, child) if !v.sameOutput(child) =>
val queryOutput = child.output
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// `CheckAnalysis` already guarantees that the cast is a up-cast for sure.
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
case (_, originAttr) => originAttr
}
Project(newOutput, child)

// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, _, _, child) =>
case v @ View(_, _, _, child) =>
child
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ class SessionCatalog(
parser.parsePlan(viewText)
}
View(
desc = metadata,
desc = Some(metadata),
isTempView = isTempView,
output = metadata.schema.toAttributes,
child = viewPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
EliminateResolvedHint,
EliminateSubqueryAliases,
EliminateView,
EliminateDataFrameTempViews,
ReplaceExpressions,
RewriteNonCorrelatedExists,
ComputeCurrentTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,10 +456,11 @@ case class InsertIntoDir(
* `CatalogTable.viewText`, should throw an error if the `viewText` is not defined.
*/
case class View(
desc: CatalogTable,
desc: Option[CatalogTable],
isTempView: Boolean,
output: Seq[Attribute],
child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {
require(desc.isDefined || isTempView)

override def producedAttributes: AttributeSet = outputSet

Expand All @@ -470,7 +471,8 @@ case class View(
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

override def simpleString(maxFields: Int): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
val viewIdent = desc.map{ d => s"${d.identifier}, "}.getOrElse("")
s"View ($viewIdent${output.mkString("[", ",", "]")})"
}

override def doCanonicalize(): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,11 +660,11 @@ class AnalysisSuite extends AnalysisTest with Matchers {
val batches = Batch("View", Once, EliminateView) :: Nil
}
val relation = LocalRelation(Symbol("a").int.notNull, Symbol("b").string)
val view = View(CatalogTable(
val view = View(Some(CatalogTable(
identifier = TableIdentifier("v1"),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType)))),
schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType))))),
isTempView = false,
output = Seq(Symbol("a").int, Symbol("b").string),
child = relation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {

// Look up a view.
catalog.setCurrentDatabase("default")
val view = View(desc = metadata, isTempView = false, output = metadata.schema.toAttributes,
val view = View(
desc = Some(metadata),
isTempView = false,
output = metadata.schema.toAttributes,
child = CatalystSqlParser.parsePlan(metadata.viewText.get))
comparePlans(catalog.lookupRelation(TableIdentifier("view1", Some("db3"))),
SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, "db3", "view1"), view))
Expand All @@ -666,7 +669,10 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
assert(metadata.viewText.isDefined)
assert(metadata.viewCatalogAndNamespace == Seq(CatalogManager.SESSION_CATALOG_NAME, "db2"))

val view = View(desc = metadata, isTempView = false, output = metadata.schema.toAttributes,
val view = View(
desc = Some(metadata),
isTempView = false,
output = metadata.schema.toAttributes,
child = CatalystSqlParser.parsePlan(metadata.viewText.get))
comparePlans(catalog.lookupRelation(TableIdentifier("view2", Some("db3"))),
SubqueryAlias(Seq(CatalogManager.SESSION_CATALOG_NAME, "db3", "view2"), view))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,15 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
plan: LogicalPlan,
cascade: Boolean,
blocking: Boolean = false): Unit = {
val qe = spark.sessionState.executePlan(plan)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

val shouldRemove: LogicalPlan => Boolean =
if (cascade) {
_.find(_.sameResult(plan)).isDefined
_.find(_.sameResult(analyzedPlan)).isDefined
} else {
_.sameResult(plan)
_.sameResult(analyzedPlan)
}
val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
this.synchronized {
Expand All @@ -187,7 +191,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
// will keep it as it is. It means the physical plan has been re-compiled already in the
// other thread.
val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded
cd.plan.find(_.sameResult(analyzedPlan)).isDefined && !cacheAlreadyLoaded
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,19 @@ case class CreateViewCommand(
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val catalogTable = prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child)
// If there is no sql text (e.g. from Dataset API), we will always store the analyzed plan
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(catalogTable)
TemporaryViewRelation(
prepareTemporaryView(
name,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child))
} else {
View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child)
assert(isTemporary)
View(None, isTemporary, aliasedPlan.output, aliasedPlan)
}
catalog.createTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (viewType == GlobalTempView) {
Expand All @@ -146,17 +147,18 @@ case class CreateViewCommand(
CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
val catalogTable = prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child)
val tableDefinition = if (!conf.storeAnalyzedPlanForView && originalText.nonEmpty) {
TemporaryViewRelation(catalogTable)
TemporaryViewRelation(
prepareTemporaryView(
viewIdent,
sparkSession,
analyzedPlan,
aliasedPlan.schema,
originalText,
child))
} else {
View(catalogTable, isTemporary, catalogTable.schema.toAttributes, child)
assert(isTemporary)
View(None, isTemporary, aliasedPlan.output, aliasedPlan)
}
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
} else if (catalog.tableExists(name)) {
Expand Down Expand Up @@ -495,8 +497,8 @@ object ViewHelper {
path: Seq[TableIdentifier],
viewIdent: TableIdentifier): Unit = {
plan match {
case v: View =>
val ident = v.desc.identifier
case v @ View(Some(desc), _, _, _) =>
val ident = desc.identifier
val newPath = path :+ ident
// If the table identifier equals to the `viewIdent`, current view node is the same with
// the altered view. We detect a view reference cycle, should throw an AnalysisException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
// same way as how a permanent view is handled. This also avoids a potential issue where a
// dependent view becomes invalid because of the above while its data is still cached.
val viewText = viewDef match {
case v: View => v.desc.viewText
case View(Some(desc), _, _, _) => desc.viewText
case _ => None
}
val plan = sparkSession.sessionState.executePlan(viewDef)
Expand Down

0 comments on commit 4b3f184

Please sign in to comment.