Skip to content

Commit

Permalink
address comments and ut fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xuanyuanking committed Apr 23, 2020
1 parent 81c9d47 commit c75fe08
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ class Analyzer(
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
ExtractWindowExpressions ::
ResolveTimeZone(conf) ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveHigherOrderFunctions(v1SessionCatalog) ::
ResolveLambdaVariables(conf) ::
ResolveTimeZone(conf) ::
ResolveRandomSeed ::
ResolveBinaryArithmetic(conf) ::
TypeCoercion.typeCoercionRules(conf) ++
Expand Down Expand Up @@ -1232,13 +1232,13 @@ class Analyzer(
/**
* Resolves the attribute and extract value expressions(s) by traversing the
* input expression in top down manner. The traversal is done in top-down manner as
* we need to skip over unbound lamda function expression. The lamda expressions are
* we need to skip over unbound lambda function expression. The lambda expressions are
* resolved in a different rule [[ResolveLambdaVariables]]
*
* Example :
* SELECT transform(array(1, 2, 3), (x, i) -> x + i)"
*
* In the case above, x and i are resolved as lamda variables in [[ResolveLambdaVariables]]
* In the case above, x and i are resolved as lambda variables in [[ResolveLambdaVariables]]
*
* Note : In this routine, the unresolved attributes are resolved from the input plan's
* children attributes.
Expand Down Expand Up @@ -2036,6 +2036,9 @@ class Analyzer(
*/
object ResolveAggregateFunctions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
// Resolve aggregate with having clause to Filter(..., Aggregate()). Note, to avoid wrongly
// resolve the having condition expression, here we skip resolving it in ResolveReferences
// and transform it to Filter after aggregate is resolved. See more details in SPARK-31519.
case AggregateWithHaving(cond, agg: Aggregate) if agg.resolved =>
resolveHaving(Filter(cond, agg), agg)

Expand Down Expand Up @@ -2138,8 +2141,8 @@ class Analyzer(
alias.toAttribute
// Grouping functions are handled in the rule [[ResolveGroupingAnalytics]].
case e: Expression if agg.groupingExpressions.exists(_.semanticEquals(e)) &&
!ResolveGroupingAnalytics.hasGroupingFunction(e) &&
!agg.output.exists(_.semanticEquals(e)) =>
!ResolveGroupingAnalytics.hasGroupingFunction(e) &&
!agg.output.exists(_.semanticEquals(e)) =>
e match {
case ne: NamedExpression =>
aggregateExpressions += ne
Expand Down Expand Up @@ -2596,7 +2599,7 @@ class Analyzer(

// Aggregate with Having clause. This rule works with an unresolved Aggregate because
// a resolved Aggregate will not have Window Functions.
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
case f @ AggregateWithHaving(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
if child.resolved &&
hasWindowFunction(aggregateExprs) &&
a.expressions.forall(_.resolved) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
Expand Down Expand Up @@ -538,3 +538,14 @@ case class UnresolvedOrdinal(ordinal: Int)
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override lazy val resolved = false
}

/**
* Represents unresolved aggregate with having clause, it is turned by the analyzer into a Filter.
*/
case class AggregateWithHaving(
havingCondition: Expression,
child: Aggregate)
extends UnaryNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = child.output
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,12 @@ package object dsl {
Aggregate(groupingExprs, aliasedExprs, logicalPlan)
}

def having(groupingExprs: Expression*)
(aggregateExprs: Expression*)(havingCondition: Expression): LogicalPlan = {
AggregateWithHaving(havingCondition, groupBy(groupingExprs: _*)(aggregateExprs: _*))
def having(
groupingExprs: Expression*)(
aggregateExprs: Expression*)(
havingCondition: Expression): LogicalPlan = {
AggregateWithHaving(havingCondition,
groupBy(groupingExprs: _*)(aggregateExprs: _*).asInstanceOf[Aggregate])
}

def window(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,16 +583,6 @@ case class Aggregate(
}
}

case class AggregateWithHaving(
havingCondition: Expression,
child: LogicalPlan)
extends UnaryNode {

override lazy val resolved: Boolean = false

override def output: Seq[Attribute] = child.output
}

case class Window(
windowExpressions: Seq[NamedExpression],
partitionSpec: Seq[Expression],
Expand Down

0 comments on commit c75fe08

Please sign in to comment.