Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31334][SQL] Don't ResolveReference/ResolveMissingReference when Filter condition with aggregate expression #28107

Original file line number Diff line number Diff line change
Expand Up @@ -1391,11 +1391,39 @@ class Analyzer(
notMatchedActions = newNotMatchedActions)
}

// When filter condition is havingConditions, columns haven't been handled by
// TypeCoercion, this make a situation that cond isn't resolved because of aggregate
// functions's checkInputDataType method. Then it can't be handled by
// ResolveAggregateFunctions, finally cause column resolve error.
// For this situation, we don't resolve cond's reference here
case f @ Filter(cond, agg @ Aggregate(_, _, _)) if containsAggregate(cond) => f

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
}

def containsAggregate(e: Expression): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we reuse ResolveAggregateFunctions.containsAggregate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we reuse ResolveAggregateFunctions.containsAggregate?

Since here function is still UnresolvedFunction, we can't just reuse this.

e.find {
// In current loop, functions maybe unresolved,
// we should judge if it is aggregate function now
case func: UnresolvedFunction =>
try {
v1SessionCatalog.lookupFunction(func.name, func.arguments)
.isInstanceOf[AggregateFunction]
} catch {
// When UnresolvedFunction is a UDF function, we can't lookup function since
// it's arguments is unresolved. If throw exception when lookup functions,
// let's assume that we don't deal with this situation right now,
// after next loop, this function's arguments will be resolved
// then we can judge if this function is aggregate function next time.
case _: Exception => true
}
case _ =>
false
}.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined
}

def resolveAssignments(
assignments: Seq[Assignment],
mergeInto: MergeIntoTable,
Expand Down Expand Up @@ -1679,7 +1707,13 @@ class Analyzer(
Project(child.output, newSort)
}

case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved =>
// When filter condition is havingConditions, columns haven't been handled by
// TypeCoercion, this make a situation that cond isn't resolved because of aggregate
// functions's checkInputDataType method. Then it can't be handled by
// ResolveAggregateFunctions, finally cause column resolve error.
// For this situation, we don't resolve cond's reference here
case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved
&& (!child.isInstanceOf[Aggregate] || !ResolveReferences.containsAggregate(cond)) =>
val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child)
if (child.output == newChild.output) {
f.copy(condition = newCond.head)
Expand Down
22 changes: 22 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3494,6 +3494,28 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Seq(Row(Map[Int, Int]()), Row(Map(1 -> 2))))
}

test("SPARK-31334: Don't ResolveReference/ResolveMissingReference when " +
"Filter condition with aggregate expression") {
Seq(
("1", 3),
("2", 3),
("3", 6),
("4", 7),
("5", 9),
("6", 9)
).toDF("a", "b").createOrReplaceTempView("testData2")

checkAnswer(sql(
"""
| SELECT b, sum(a) as a
| FROM testData2
| GROUP BY b
| HAVING sum(a) > 3
""".stripMargin),
Row(7, 4.0) :: Row(9, 11.0) :: Nil)
}


test("SPARK-31242: clone SparkSession should respect sessionInitWithConfigDefaults") {
// Note, only the conf explicitly set in SparkConf(e.g. in SharedSparkSessionBase) would cause
// problem before the fix.
Expand Down