From de75edf7652f0edbd6a2474010771730549e77ba Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Mon, 6 Jan 2020 17:27:57 +0800 Subject: [PATCH] resolve conflict attributes in batch --- .../sql/catalyst/analysis/Analyzer.scala | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f8751f9cfd7f0..e272ec340d746 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1043,7 +1043,7 @@ class Analyzer( logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " + s"between $left and $right") - right.collect { + val conflictPlans = right.collect { // Handle base relations that might appear more than once. case oldVersion: MultiInstanceRelation if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => @@ -1089,27 +1089,31 @@ class Analyzer( .nonEmpty => (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions))) } - // Only handle first case, others will be fixed on the next pass. - .headOption match { - case None => - /* - * No result implies that there is a logical plan node that produces new references - * that this rule cannot handle. When that is the case, there must be another rule - * that resolves these conflicts. Otherwise, the analysis will fail. - */ - right - case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - right transformUp { - case r if r == oldRelation => newRelation - } transformUp { - case other => other transformExpressions { - case a: Attribute => - dedupAttr(a, attributeRewrites) - case s: SubqueryExpression => - s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) - } + + /* + * Note that it's possible for conflictPlans to be empty while it implies that there + * is a logical plan node that produces new references that this rule cannot handle. + * When that is the case, there must be another rule that resolves these conflicts. + * Otherwise, the analysis will fail. + */ + if (conflictPlans.isEmpty) { + right + } else { + val attributeRewrites = AttributeMap(conflictPlans.flatMap { + case (oldRelation, newRelation) => oldRelation.output.zip(newRelation.output)}) + val conflictPlanMap = conflictPlans.toMap + // transformDown so that we can replace all the old Relations in one turn due to + // the reason that conflictPlans are also collected in pre-order. + right transformDown { + case r if conflictPlanMap.contains(r) => conflictPlanMap(r) + } transformUp { + case other => other transformExpressions { + case a: Attribute => + dedupAttr(a, attributeRewrites) + case s: SubqueryExpression => + s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) } + } } }