Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning caused by Literal #35967

Closed
wants to merge 1 commit into from

Conversation

mcdull-zhang
Copy link
Contributor

This is a backport of #35878 to branch 3.2.

What changes were proposed in this pull request?

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:

*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]

after this pr:

*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>

Why are the changes needed?

Execution performance improvement

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added unit test

@github-actions github-actions bot added the SQL label Mar 25, 2022
Copy link
Contributor

@weixiuli weixiuli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

wangyum pushed a commit that referenced this pull request Mar 26, 2022
…iteral

This is a backport of #35878  to branch 3.2.

### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

### Why are the changes needed?
Execution performance improvement

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test

Closes #35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Mar 26, 2022
…iteral

This is a backport of #35878  to branch 3.1.

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

Execution performance improvement

No

Added unit test

Closes #35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Mar 26, 2022
…iteral

This is a backport of #35878  to branch 3.0.

The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

Execution performance improvement

No

Added unit test

Closes #35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member

wangyum commented Mar 26, 2022

Merged to branch-3.2, branch-3.1 and branch-3.0.

@wangyum wangyum closed this Mar 26, 2022
kazuyukitanimura pushed a commit to kazuyukitanimura/spark that referenced this pull request Aug 10, 2022
…iteral

This is a backport of apache#35878  to branch 3.2.

### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  :     :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  :              +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :  :                 +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335]
   :  :                    +- *(1) Project [store_id#5291, state_province#5292]
   :  :                       +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
   :  :                          +- *(1) ColumnarToRow
   :  :                             +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   :        :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   :                 +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=apache#336]
   +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  :     +- *(1) ColumnarToRow
   :  :        +- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct<date_id:int>
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   :     +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :        +- *(2) ColumnarToRow
   :           +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct<date_id:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=apache#326]
      +- *(3) Project [store_id#5291, state_province#5292]
         +- *(3) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291))
            +- *(3) ColumnarToRow
               +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache...., PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4),EqualNullSafe(store_id,5))..., ReadSchema: struct<store_id:int,state_province:string,country:string>
```

### Why are the changes needed?
Execution performance improvement

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test

Closes apache#35967 from mcdull-zhang/spark_38570_3.2.

Authored-by: mcdull-zhang <work4dong@163.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 8621914)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants