Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50091][SQL][3.5] Handle case of aggregates in left-hand operand of IN-subquery #49663

Conversation

bersprockets
Copy link
Contributor

What changes were proposed in this pull request?

This is a back-port of #48627.

This PR adds code to RewritePredicateSubquery#apply to explicitly handle the case where an Aggregate node contains an aggregate expression in the left-hand operand of an IN-subquery expression. The explicit handler moves the IN-subquery expressions out of the Aggregate and into a parent Project node. The Aggregate will continue to perform the aggregations that were used as an operand to the IN-subquery expression, but will not include the IN-subquery expression itself. After pulling up IN-subquery expressions into a Project node, RewritePredicateSubquery#apply is called again to handle the Project as a UnaryNode. The Join will now be inserted between the Project and the Aggregate node, and the join condition will use an attribute rather than an aggregate expression, e.g.:

Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))https://github.com/apache/spark/pull/40]
+- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L)
   :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L]
   :  +- LocalRelation [col1#32, col2#33]
   +- LocalRelation [c2#39L]

sum(col2)#41L in the above join condition, despite how it looks, is the name of the attribute, not an aggregate expression.

Why are the changes needed?

The following query fails:

create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);
create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);

select col1, sum(col2) in (select c2 from v1)
from v2 group by col1;

It fails with this error:

[INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, false]) SQLSTATE: XX000

With SPARK_TESTING=1, it fails with this error:

[PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch RewriteSubquery generated an invalid plan: Special expressions are placed in the wrong plan:
Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN (listquery()))https://github.com/apache/spark/pull/19]
+- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L)
   :- LocalRelation [col1#11, col2#12]
   +- LocalRelation [c2#18L]

The issue is that RewritePredicateSubquery builds a Join operator where the join condition contains an aggregate expression.

The bug is in the handler for UnaryNode in RewritePredicateSubquery#apply, which adds a Join below the Aggregate and assumes that the left-hand operand of IN-subquery can be used in the join condition. This works fine for most cases, but not when the left-hand operand is an aggregate expression.

This PR moves the offending IN-subqueries to a Project node, with the aggregates replaced by attributes referring to the aggregate expressions. The resulting join condition now uses those attributes rather than the actual aggregate expressions.

Does this PR introduce any user-facing change?

No, other than allowing this type of query to succeed.

How was this patch tested?

New unit tests.

Was this patch authored or co-authored using generative AI tooling?

No.

…IN-subquery

This PR adds code to `RewritePredicateSubquery#apply` to explicitly handle the case where an `Aggregate` node contains an aggregate expression in the left-hand operand of an IN-subquery expression. The explicit handler moves the IN-subquery expressions out of the `Aggregate` and into a parent `Project` node. The `Aggregate` will continue to perform the aggregations that were used as an operand to the IN-subquery expression, but will not include the IN-subquery expression itself. After pulling up IN-subquery expressions into a Project node, `RewritePredicateSubquery#apply` is called again to handle the `Project` as a `UnaryNode`. The `Join` will now be inserted between the `Project` and the `Aggregate` node, and the join condition will use an attribute rather than an aggregate expression, e.g.:
```
Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))apache#40]
+- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L)
   :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L]
   :  +- LocalRelation [col1#32, col2#33]
   +- LocalRelation [c2#39L]
```
`sum(col2)#41L` in the above join condition, despite how it looks, is the name of the attribute, not an aggregate expression.

The following query fails:
```
create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);
create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);

select col1, sum(col2) in (select c2 from v1)
from v2 group by col1;
```
It fails with this error:
```
[INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, false]) SQLSTATE: XX000
```
With SPARK_TESTING=1, it fails with this error:
```
[PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch RewriteSubquery generated an invalid plan: Special expressions are placed in the wrong plan:
Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN (listquery()))apache#19]
+- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L)
   :- LocalRelation [col1#11, col2#12]
   +- LocalRelation [c2#18L]
```
The issue is that `RewritePredicateSubquery` builds a `Join` operator where the join condition contains an aggregate expression.

The bug is in the handler for `UnaryNode` in `RewritePredicateSubquery#apply`, which adds a `Join` below the `Aggregate` and assumes that the left-hand operand of IN-subquery can be used in the join condition. This works fine for most cases, but not when the left-hand operand is an aggregate expression.

This PR moves the offending IN-subqueries to a `Project` node, with the aggregates replaced by attributes referring to the aggregate expressions. The resulting join condition now uses those attributes rather than the actual aggregate expressions.

No, other than allowing this type of query to succeed.

New unit tests.

No.

Closes apache#48627 from bersprockets/aggregate_in_set_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@github-actions github-actions bot added the SQL label Jan 25, 2025
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @bersprockets .
Merged to branch-3.5.

dongjoon-hyun pushed a commit that referenced this pull request Jan 25, 2025
…d of IN-subquery

### What changes were proposed in this pull request?

This is a back-port of #48627.

This PR adds code to `RewritePredicateSubquery#apply` to explicitly handle the case where an `Aggregate` node contains an aggregate expression in the left-hand operand of an IN-subquery expression. The explicit handler moves the IN-subquery expressions out of the `Aggregate` and into a parent `Project` node. The `Aggregate` will continue to perform the aggregations that were used as an operand to the IN-subquery expression, but will not include the IN-subquery expression itself. After pulling up IN-subquery expressions into a Project node, `RewritePredicateSubquery#apply` is called again to handle the `Project` as a `UnaryNode`. The `Join` will now be inserted between the `Project` and the `Aggregate` node, and the join condition will use an attribute rather than an aggregate expression, e.g.:
```
Project [col1#32, exists#42 AS (sum(col2) IN (listquery()))#40]
+- Join ExistenceJoin(exists#42), (sum(col2)#41L = c2#39L)
   :- Aggregate [col1#32], [col1#32, sum(col2#33) AS sum(col2)#41L]
   :  +- LocalRelation [col1#32, col2#33]
   +- LocalRelation [c2#39L]
```
`sum(col2)#41L` in the above join condition, despite how it looks, is the name of the attribute, not an aggregate expression.

### Why are the changes needed?

The following query fails:
```
create or replace temp view v1(c1, c2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);
create or replace temp view v2(col1, col2) as values (1, 2), (1, 3), (2, 2), (3, 7), (3, 1);

select col1, sum(col2) in (select c2 from v1)
from v2 group by col1;
```
It fails with this error:
```
[INTERNAL_ERROR] Cannot generate code for expression: sum(input[1, int, false]) SQLSTATE: XX000
```
With SPARK_TESTING=1, it fails with this error:
```
[PLAN_VALIDATION_FAILED_RULE_IN_BATCH] Rule org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery in batch RewriteSubquery generated an invalid plan: Special expressions are placed in the wrong plan:
Aggregate [col1#11], [col1#11, first(exists#20, false) AS (sum(col2) IN (listquery()))#19]
+- Join ExistenceJoin(exists#20), (sum(col2#12) = c2#18L)
   :- LocalRelation [col1#11, col2#12]
   +- LocalRelation [c2#18L]
```
The issue is that `RewritePredicateSubquery` builds a `Join` operator where the join condition contains an aggregate expression.

The bug is in the handler for `UnaryNode` in `RewritePredicateSubquery#apply`, which adds a `Join` below the `Aggregate` and assumes that the left-hand operand of IN-subquery can be used in the join condition. This works fine for most cases, but not when the left-hand operand is an aggregate expression.

This PR moves the offending IN-subqueries to a `Project` node, with the aggregates replaced by attributes referring to the aggregate expressions. The resulting join condition now uses those attributes rather than the actual aggregate expressions.

### Does this PR introduce _any_ user-facing change?

No, other than allowing this type of query to succeed.

### How was this patch tested?

New unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49663 from bersprockets/aggregate_in_set_issue_br35.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants