Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions #24926

Conversation

BryanCutler
Copy link
Member

What changes were proposed in this pull request?

When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same.

This PR checks the mapPartitionsInternal iterator to be non-empty before calling ArrowPythonRunner to start computation on the iterator.

How was this patch tested?

Existing tests. Ran the following benchmarks a simple example where most partitions are empty:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame( 
     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], 
     ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())

df.groupby("id").apply(normalize).count()

Before

In [4]: %timeit df.groupby("id").apply(normalize).count()                                                                    
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)             

In [5]: %timeit df.groupby("id").apply(normalize).count()                                                                    
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)             

In [6]: %timeit df.groupby("id").apply(normalize).count()                                                                    
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

After this Change

In [2]: %timeit df.groupby("id").apply(normalize).count()                                                                    
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)             

In [3]: %timeit df.groupby("id").apply(normalize).count()                                                                    
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [4]: %timeit df.groupby("id").apply(normalize).count()                                                                    
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

@BryanCutler
Copy link
Member Author

This probably won't have an impact if data is not small, but given how many tests there are for these it should hopefully reduce some testing time.

Strangely, AggregateInPandasExec did not show an improvement, although the code is very similar. I'll have to look at that one in more detail.

(key, rows.map(prunedProj))
}
// Only execute on non-empty partitions
if (iter.nonEmpty) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is a little off, this is really the only change

@BryanCutler
Copy link
Member Author

cc @HyukjinKwon @icexelloss

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106744 has finished for PR 24926 at commit 4eaef85.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think I made the same change separately in R vectorization as well. Looks good as is too. Although I am sure we have, just for sure, do we have a test that process empty partitions?

@BryanCutler
Copy link
Member Author

do we have a test that process empty partitions?

We have one for pandas scalar udfs, but not specifically for these. Although I believe a bunch of empty partitions are created during the shuffle, it still would be good to test starting with one. Let me go ahead and add that.

@BryanCutler
Copy link
Member Author

Looks like this cut almost 30s off of test time lol, happy friday @shaneknapp !

GroupedMapPandasUDFTests
Ran 17 tests in 53.236s vs.
Ran 18 tests in 24.102s

@SparkQA
Copy link

SparkQA commented Jun 21, 2019

Test build #106779 has finished for PR 24926 at commit 9be0110.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@BryanCutler
Copy link
Member Author

Thanks @dongjoon-hyun and @HyukjinKwon !

@BryanCutler BryanCutler deleted the pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128 branch June 24, 2019 17:15
@shaneknapp
Copy link
Contributor

shaneknapp commented Jun 24, 2019 via email

kiku-jw pushed a commit to kiku-jw/spark that referenced this pull request Jun 26, 2019
## What changes were proposed in this pull request?

When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same.

This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator.

## How was this patch tested?

Existing tests. Ran the following benchmarks a simple example where most partitions are empty:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame(
     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
     ("id", "v"))

pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())

df.groupby("id").apply(normalize).count()
```

**Before**
```
In [4]: %timeit df.groupby("id").apply(normalize).count()
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [5]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

**After this Change**
```
In [2]: %timeit df.groupby("id").apply(normalize).count()
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [3]: %timeit df.groupby("id").apply(normalize).count()
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [4]: %timeit df.groupby("id").apply(normalize).count()
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

Closes apache#24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants