-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41630][SQL] Support implicit lateral column alias resolution on Project #38776
Closed
Closed
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
04959c2
refactor analyzer adding a new object
anchovYu 6f44c85
lca code
anchovYu 725e5ac
add tests, refine logic
anchovYu 660e1d2
move lca rule to a new file
anchovYu fd06094
rename conf
anchovYu 7d4f80f
test failure
anchovYu b9704d5
small fix
anchovYu 97ee293
Merge remote-tracking branch 'apache/master' into SPARK-27561-refactor
anchovYu 5785943
make changes to accomodate the recent refactor
anchovYu 757cffb
introduce leaf exp in Project as well
anchovYu 29de892
handle a corner case
anchovYu 72991c6
add more tests; add check rule
anchovYu d45fe31
uplift the necessity to resolve expression in second phase; add more …
anchovYu 1f55f73
address comments to add tests for LCA off
anchovYu f753529
revert the refactor, split LCA into two rules
anchovYu b9f706f
better refactor
anchovYu 94d5c9e
address comments
anchovYu 8d20986
address comments
anchovYu File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
135 changes: 135 additions & 0 deletions
135
...ain/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.analysis | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, LateralColumnAliasReference, NamedExpression} | ||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} | ||
import org.apache.spark.sql.catalyst.rules.Rule | ||
import org.apache.spark.sql.catalyst.trees.TreeNodeTag | ||
import org.apache.spark.sql.catalyst.trees.TreePattern.LATERAL_COLUMN_ALIAS_REFERENCE | ||
import org.apache.spark.sql.internal.SQLConf | ||
|
||
/** | ||
* This rule is the second phase to resolve lateral column alias. | ||
* | ||
* Resolve lateral column alias, which references the alias defined previously in the SELECT list. | ||
* Plan-wise, it handles two types of operators: Project and Aggregate. | ||
* - in Project, pushing down the referenced lateral alias into a newly created Project, resolve | ||
* the attributes referencing these aliases | ||
* - in Aggregate TODO. | ||
* | ||
* The whole process is generally divided into two phases: | ||
* 1) recognize resolved lateral alias, wrap the attributes referencing them with | ||
* [[LateralColumnAliasReference]] | ||
* 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. | ||
* For Project, it further resolves the attributes and push down the referenced lateral aliases. | ||
* For Aggregate, TODO | ||
* | ||
* Example for Project: | ||
* Before rewrite: | ||
* Project [age AS a, 'a + 1] | ||
* +- Child | ||
* | ||
* After phase 1: | ||
* Project [age AS a, lateralalias(a) + 1] | ||
* +- Child | ||
* | ||
* After phase 2: | ||
* Project [a, a + 1] | ||
* +- Project [child output, age AS a] | ||
* +- Child | ||
* | ||
* Example for Aggregate TODO | ||
* | ||
* | ||
* The name resolution priority: | ||
* local table column > local lateral column alias > outer reference | ||
* | ||
* Because lateral column alias has higher resolution priority than outer reference, it will try | ||
* to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an | ||
* [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with | ||
* [[LateralColumnAliasReference]]. | ||
*/ | ||
object ResolveLateralColumnAliasReference extends Rule[LogicalPlan] { | ||
case class AliasEntry(alias: Alias, index: Int) | ||
|
||
/** | ||
* A tag to store the nameParts from the original unresolved attribute. | ||
* It is set for [[OuterReference]], used in the current rule to convert [[OuterReference]] back | ||
* to [[LateralColumnAliasReference]]. | ||
*/ | ||
val NAME_PARTS_FROM_UNRESOLVED_ATTR = TreeNodeTag[Seq[String]]("name_parts_from_unresolved_attr") | ||
|
||
override def apply(plan: LogicalPlan): LogicalPlan = { | ||
if (!conf.getConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED)) { | ||
plan | ||
} else { | ||
// phase 2: unwrap | ||
plan.resolveOperatorsUpWithPruning( | ||
_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) { | ||
case p @ Project(projectList, child) if p.resolved | ||
&& projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) => | ||
var aliasMap = AttributeMap.empty[AliasEntry] | ||
val referencedAliases = collection.mutable.Set.empty[AliasEntry] | ||
def unwrapLCAReference(e: NamedExpression): NamedExpression = { | ||
e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { | ||
case lcaRef: LateralColumnAliasReference if aliasMap.contains(lcaRef.a) => | ||
val aliasEntry = aliasMap.get(lcaRef.a).get | ||
// If there is no chaining of lateral column alias reference, push down the alias | ||
// and unwrap the LateralColumnAliasReference to the NamedExpression inside | ||
// If there is chaining, don't resolve and save to future rounds | ||
if (!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) { | ||
referencedAliases += aliasEntry | ||
lcaRef.ne | ||
} else { | ||
lcaRef | ||
} | ||
case lcaRef: LateralColumnAliasReference if !aliasMap.contains(lcaRef.a) => | ||
// It shouldn't happen, but restore to unresolved attribute to be safe. | ||
UnresolvedAttribute(lcaRef.nameParts) | ||
}.asInstanceOf[NamedExpression] | ||
} | ||
val newProjectList = projectList.zipWithIndex.map { | ||
case (a: Alias, idx) => | ||
val lcaResolved = unwrapLCAReference(a) | ||
// Insert the original alias instead of rewritten one to detect chained LCA | ||
aliasMap += (a.toAttribute -> AliasEntry(a, idx)) | ||
lcaResolved | ||
case (e, _) => | ||
unwrapLCAReference(e) | ||
} | ||
|
||
if (referencedAliases.isEmpty) { | ||
p | ||
} else { | ||
val outerProjectList = collection.mutable.Seq(newProjectList: _*) | ||
val innerProjectList = | ||
collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]): _*) | ||
referencedAliases.foreach { case AliasEntry(alias: Alias, idx) => | ||
outerProjectList.update(idx, alias.toAttribute) | ||
innerProjectList += alias | ||
} | ||
p.copy( | ||
projectList = outerProjectList.toSeq, | ||
child = Project(innerProjectList.toSeq, child) | ||
) | ||
} | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have a rule like
RemoveTempResolvedColumn
to restoreLateralColumnAliasReference
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't add it intentionally. This is because I don't want those attributes actually can be resolve as LCA but to show in the error msg as UnresolvedAttribute. Also note that unlike RemoveTempResolvedColumn, LCARef can't be directly resolved to the NamedExpression inside of it because the plan won't be right - there is no alias push down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this should not happen, we should throw an internal error
SparkThrowable.internalError
, so that it can include more debug information, instead ofUNRESOLVED_COLUMN