Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38570][SQL] Incorrect DynamicPartitionPruning caused by Literal #35878

Closed

Conversation

mcdull-zhang
Copy link
Contributor

@mcdull-zhang mcdull-zhang commented Mar 16, 2022

What changes were proposed in this pull request?

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:

*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]

after this pr:

*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>

Why are the changes needed?

Execution performance improvement

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test

@github-actions github-actions bot added the SQL label Mar 16, 2022
@HyukjinKwon HyukjinKwon changed the title [SPARK-38570][SQL]Incorrect DynamicPartitionPruning caused by Literal [SPARK-38570][SQL] Incorrect DynamicPartitionPruning caused by Literal Mar 17, 2022
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mcdull-zhang
Copy link
Contributor Author

friendly ping @cloud-fan


test("SPARK-38570: Fix incorrect DynamicPartitionPruning caused by Literal") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
withTable("fact1", "fact2", "dim") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have lots of built-in tables. Cloud we change the test query to:

SELECT f.store_id,
       f.date_id,
       s.state_province
FROM   (SELECT 4 AS store_id,
               date_id,
               product_id
        FROM   fact_sk
        WHERE  date_id >= 1300
        UNION ALL
        SELECT 5 AS store_id,
               date_id,
               product_id
        FROM   fact_stats
        WHERE  date_id <= 1000) f
       JOIN dim_store s
         ON f.store_id = s.store_id
WHERE  s.country = 'US'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So embarrassing, I wanted to use these built-in tables before, but I didn't see table fact_stats.

…essions/predicates.scala


code format

Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
@HyukjinKwon
Copy link
Member

cc @maryannxue too

…runingSuite.scala


code style

Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Copy link
Contributor

@weixiuli weixiuli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@cloud-fan
Copy link
Contributor

@mcdull-zhang can you re-trigger the test?

@mcdull-zhang mcdull-zhang deleted the literal_dynamic_partition branch March 24, 2022 14:13
@mcdull-zhang mcdull-zhang restored the literal_dynamic_partition branch March 24, 2022 14:13
@mcdull-zhang mcdull-zhang reopened this Mar 24, 2022
@wangyum wangyum closed this in 4c51851 Mar 25, 2022
wangyum pushed a commit that referenced this pull request Mar 25, 2022
### What changes were proposed in this pull request?

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```text
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```

after this pr:
```text
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

### Why are the changes needed?
Execution performance improvement

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test

Closes #35878 from mcdull-zhang/literal_dynamic_partition.

Lead-authored-by: mcdull-zhang <work4dong@163.com>
Co-authored-by: mcdull_zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 4c51851)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member

wangyum commented Mar 25, 2022

Merged to master and branch-3.3.

@wangyum
Copy link
Member

wangyum commented Mar 25, 2022

@mcdull-zhang Could you backport this to branch-3.2.

@mcdull-zhang
Copy link
Contributor Author

@wangyum I have newly submitted a pr for 3.2.

thanks again all.

wangyum pushed a commit that referenced this pull request Mar 26, 2022
…iteral

This is a backport of #35878  to branch 3.2.

### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

### Why are the changes needed?
Execution performance improvement

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test

Closes #35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Mar 26, 2022
…iteral

This is a backport of #35878  to branch 3.1.

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

Execution performance improvement

No

Added unit test

Closes #35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Mar 26, 2022
…iteral

This is a backport of #35878  to branch 3.0.

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

Execution performance improvement

No

Added unit test

Closes #35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…iteral

This is a backport of apache#35878  to branch 3.2.

### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

### Why are the changes needed?
Execution performance improvement

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test

Closes apache#35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants