Skip to content

Commit

Permalink
resolve conflict attributes in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngone51 committed Jan 6, 2020
1 parent e38964c commit de75edf
Showing 1 changed file with 25 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ class Analyzer(
logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " +
s"between $left and $right")

right.collect {
val conflictPlans = right.collect {
// Handle base relations that might appear more than once.
case oldVersion: MultiInstanceRelation
if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
Expand Down Expand Up @@ -1089,27 +1089,31 @@ class Analyzer(
.nonEmpty =>
(oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions)))
}
// Only handle first case, others will be fixed on the next pass.
.headOption match {
case None =>
/*
* No result implies that there is a logical plan node that produces new references
* that this rule cannot handle. When that is the case, there must be another rule
* that resolves these conflicts. Otherwise, the analysis will fail.
*/
right
case Some((oldRelation, newRelation)) =>
val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output))
right transformUp {
case r if r == oldRelation => newRelation
} transformUp {
case other => other transformExpressions {
case a: Attribute =>
dedupAttr(a, attributeRewrites)
case s: SubqueryExpression =>
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
}

/*
* Note that it's possible for conflictPlans to be empty while it implies that there
* is a logical plan node that produces new references that this rule cannot handle.
* When that is the case, there must be another rule that resolves these conflicts.
* Otherwise, the analysis will fail.
*/
if (conflictPlans.isEmpty) {
right
} else {
val attributeRewrites = AttributeMap(conflictPlans.flatMap {
case (oldRelation, newRelation) => oldRelation.output.zip(newRelation.output)})
val conflictPlanMap = conflictPlans.toMap
// transformDown so that we can replace all the old Relations in one turn due to
// the reason that conflictPlans are also collected in pre-order.
right transformDown {
case r if conflictPlanMap.contains(r) => conflictPlanMap(r)
} transformUp {
case other => other transformExpressions {
case a: Attribute =>
dedupAttr(a, attributeRewrites)
case s: SubqueryExpression =>
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
}
}
}
}

Expand Down

0 comments on commit de75edf

Please sign in to comment.