-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41630][SQL] Support implicit lateral column alias resolution on Project #38776
Changes from 17 commits
04959c2
6f44c85
725e5ac
660e1d2
fd06094
7d4f80f
b9704d5
97ee293
5785943
757cffb
29de892
72991c6
d45fe31
1f55f73
f753529
b9f706f
94d5c9e
8d20986
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, Decorrela | |
import org.apache.spark.sql.catalyst.plans._ | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.trees.TreeNodeTag | ||
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION | ||
import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, UNRESOLVED_WINDOW_EXPRESSION} | ||
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils} | ||
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} | ||
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} | ||
|
@@ -638,6 +638,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB | |
case UnresolvedWindowExpression(_, windowSpec) => | ||
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name) | ||
}) | ||
// This should not happen, resolved Project or Aggregate should restore or resolve | ||
// all lateral column alias references. Add check for extra safe. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we have a rule like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't add it intentionally. This is because I don't want those attributes actually can be resolve as LCA but to show in the error msg as UnresolvedAttribute. Also note that unlike RemoveTempResolvedColumn, LCARef can't be directly resolved to the NamedExpression inside of it because the plan won't be right - there is no alias push down. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this should not happen, we should throw an internal error |
||
projectList.foreach(_.transformDownWithPruning( | ||
_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { | ||
case lcaRef: LateralColumnAliasReference if p.resolved => | ||
failUnresolvedAttribute( | ||
p, UnresolvedAttribute(lcaRef.nameParts), "UNRESOLVED_COLUMN") | ||
}) | ||
|
||
case j: Join if !j.duplicateResolved => | ||
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet) | ||
|
@@ -714,6 +722,17 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB | |
"operator" -> other.nodeName, | ||
"invalidExprSqls" -> invalidExprSqls.mkString(", "))) | ||
|
||
// This should not happen, resolved Project or Aggregate should restore or resolve | ||
// all lateral column alias references. Add check for extra safe. | ||
case agg @ Aggregate(_, aggList, _) | ||
if aggList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) && agg.resolved => | ||
aggList.foreach(_.transformDownWithPruning( | ||
_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { | ||
case lcaRef: LateralColumnAliasReference => | ||
failUnresolvedAttribute( | ||
agg, UnresolvedAttribute(lcaRef.nameParts), "UNRESOLVED_COLUMN") | ||
}) | ||
|
||
case _ => // Analysis successful! | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.analysis | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, LateralColumnAliasReference, NamedExpression} | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.catalyst.trees.TreePattern.LATERAL_COLUMN_ALIAS_REFERENCE | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
/** | ||
* This rule is the second phase to resolve lateral column alias. | ||
* | ||
* Resolve lateral column alias, which references the alias defined previously in the SELECT list. | ||
* Plan-wise, it handles two types of operators: Project and Aggregate. | ||
* - in Project, pushing down the referenced lateral alias into a newly created Project, resolve | ||
* the attributes referencing these aliases | ||
* - in Aggregate TODO. | ||
* | ||
* The whole process is generally divided into two phases: | ||
* 1) recognize resolved lateral alias, wrap the attributes referencing them with | ||
* [[LateralColumnAliasReference]] | ||
* 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. | ||
* For Project, it further resolves the attributes and push down the referenced lateral aliases. | ||
* For Aggregate, TODO | ||
* | ||
* Example for Project: | ||
* Before rewrite: | ||
* Project [age AS a, 'a + 1] | ||
* +- Child | ||
* | ||
* After phase 1: | ||
* Project [age AS a, lateralalias(a) + 1] | ||
* +- Child | ||
* | ||
* After phase 2: | ||
* Project [a, a + 1] | ||
* +- Project [child output, age AS a] | ||
* +- Child | ||
* | ||
* Example for Aggregate TODO | ||
* | ||
* | ||
* The name resolution priority: | ||
* local table column > local lateral column alias > outer reference | ||
* | ||
* Because lateral column alias has higher resolution priority than outer reference, it will try | ||
* to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an | ||
* [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with | ||
* [[LateralColumnAliasReference]]. | ||
*/ | ||
object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] { | ||
case class AliasEntry(alias: Alias, index: Int) | ||
|
||
override def apply(plan: LogicalPlan): LogicalPlan = { | ||
if (!conf.getConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED)) { | ||
plan | ||
} else { | ||
// phase 2: unwrap | ||
plan.resolveOperatorsUpWithPruning( | ||
_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) { | ||
case p @ Project(projectList, child) if p.resolved | ||
&& projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => | ||
var aliasMap = AttributeMap.empty[AliasEntry] | ||
val referencedAliases = collection.mutable.Set.empty[AliasEntry] | ||
def unwrapLCAReference(e: NamedExpression): NamedExpression = { | ||
e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { | ||
case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) => | ||
val aliasEntry = aliasMap.get(lcaRef.a).get | ||
// If there is no chaining of lateral column alias reference, push down the alias | ||
// and unwrap the LateralColumnAliasReference to the NamedExpression inside | ||
// If there is chaining, don't resolve and save to future rounds | ||
if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { | ||
referencedAliases += aliasEntry | ||
lcaRef.ne | ||
} else { | ||
lcaRef | ||
} | ||
case lcaRef: LateralColumnAliasReference if !aliasMap.contains(lcaRef.a) => | ||
// It shouldn't happen, but restore to unresolved attribute to be safe. | ||
UnresolvedAttribute(lcaRef.nameParts) | ||
}.asInstanceOf[NamedExpression] | ||
} | ||
val newProjectList = projectList.zipWithIndex.map { | ||
case (a: Alias, idx) => | ||
val lcaResolved = unwrapLCAReference(a) | ||
// Insert the original alias instead of rewritten one to detect chained LCA | ||
aliasMap += (a.toAttribute -> AliasEntry(a, idx)) | ||
lcaResolved | ||
case (e, _) => | ||
unwrapLCAReference(e) | ||
} | ||
|
||
if (referencedAliases.isEmpty) { | ||
p | ||
} else { | ||
val outerProjectList = collection.mutable.Seq(newProjectList: _*) | ||
val innerProjectList = | ||
collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*) | ||
referencedAliases.foreach { case AliasEntry(alias: Alias, idx) => | ||
outerProjectList.update(idx, alias.toAttribute) | ||
innerProjectList += alias | ||
} | ||
p.copy( | ||
projectList = outerProjectList.toSeq, | ||
child = Project(innerProjectList.toSeq, child) | ||
) | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.