Skip to content

Commit

Permalink
[SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same.

This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator.

## How was this patch tested?

Existing tests. Ran the following benchmarks a simple example where most partitions are empty:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame(
     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
     ("id", "v"))

pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    v = pdf.v
    return pdf.assign(v=(v - v.mean()) / v.std())

df.groupby("id").apply(normalize).count()
```

**Before**
```
In [4]: %timeit df.groupby("id").apply(normalize).count()
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [5]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

**After this Change**
```
In [2]: %timeit df.groupby("id").apply(normalize).count()
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [3]: %timeit df.groupby("id").apply(normalize).count()
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [4]: %timeit df.groupby("id").apply(normalize).count()
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

Closes apache#24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
BryanCutler authored and kiku-jw committed Jun 26, 2019
1 parent c017e78 commit 5029adf
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
13 changes: 13 additions & 0 deletions python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import unittest

from pyspark.rdd import PythonEvalType
from pyspark.sql import Row
from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
Expand Down Expand Up @@ -461,6 +462,18 @@ def test_register_vectorized_udf_basic(self):
expected = [1, 5]
self.assertEqual(actual, expected)

def test_grouped_with_empty_partition(self):
data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
expected = [Row(id=1, sum=5), Row(id=2, x=4)]
num_parts = len(data) + 1
df = self.spark.createDataFrame(self.sc.parallelize(data, numSlices=num_parts))

f = pandas_udf(lambda x: x.sum(),
'int', PandasUDFType.GROUPED_AGG)

result = df.groupBy('id').agg(f(df['x']).alias('sum')).collect()
self.assertEqual(result, expected)


if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_grouped_agg import *
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,18 @@ def test_mixed_scalar_udfs_followed_by_grouby_apply(self):

self.assertEquals(result.collect()[0]['sum'], 165)

def test_grouped_with_empty_partition(self):
data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)]
num_parts = len(data) + 1
df = self.spark.createDataFrame(self.sc.parallelize(data, numSlices=num_parts))

f = pandas_udf(lambda pdf: pdf.assign(x=pdf['x'].sum()),
'id long, x int', PandasUDFType.GROUPED_MAP)

result = df.groupBy('id').apply(f).collect()
self.assertEqual(result, expected)


if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_grouped_map import *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ case class AggregateInPandasExec(
StructField(s"_$i", dt)
})

inputRDD.mapPartitionsInternal { iter =>
// Map grouped rows to ArrowPythonRunner results, Only execute if partition is not empty
inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
val prunedProj = UnsafeProjection.create(allInputs, child.output)

val grouped = if (groupingExpressions.isEmpty) {
Expand Down Expand Up @@ -151,6 +152,6 @@ case class AggregateInPandasExec(
val joinedRow = joined(leftRow, aggOutputRow)
resultProj(joinedRow)
}
}
}}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ case class FlatMapGroupsInPandasExec(
val dedupAttributes = nonDupGroupingAttributes ++ dataAttributes
val dedupSchema = StructType.fromAttributes(dedupAttributes)

inputRDD.mapPartitionsInternal { iter =>
// Map grouped rows to ArrowPythonRunner results, Only execute if partition is not empty
inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
val grouped = if (groupingAttributes.isEmpty) {
Iterator(iter)
} else {
Expand Down Expand Up @@ -156,6 +157,6 @@ case class FlatMapGroupsInPandasExec(
flattenedBatch.setNumRows(batch.numRows())
flattenedBatch.rowIterator.asScala
}.map(unsafeProj)
}
}}
}
}

0 comments on commit 5029adf

Please sign in to comment.