Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45065][PYTHON][PS] Support Pandas 2.1.0 #42793

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bf79e7a
[SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
itholic Sep 4, 2023
e81a97a
Fix tests
itholic Sep 5, 2023
246d2a0
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 6, 2023
f874b85
Respect as_index=False when given funcs is a type of list
itholic Sep 6, 2023
49c5c5d
Apply the Pandas 2.1.0 changes
itholic Sep 7, 2023
7184a3b
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 7, 2023
15c5aa7
Fix ordering for stack
itholic Sep 8, 2023
5dbf456
Added migration guide
itholic Sep 8, 2023
2a17d1d
Deprecate all features from Pandas 2.1.0.
itholic Sep 11, 2023
f48215c
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 11, 2023
dc68af1
Fix linter
itholic Sep 12, 2023
4e89cf5
fix test
itholic Sep 12, 2023
a585fbe
Fix linter
itholic Sep 13, 2023
1831923
fix
zhengruifeng Sep 13, 2023
91b865c
resolve conflicts
itholic Sep 13, 2023
76433d0
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 13, 2023
afecab9
Retrigger the CI
itholic Sep 14, 2023
6ff4df2
replace the import
itholic Sep 14, 2023
5a0fe26
revert unnecess change
itholic Sep 14, 2023
bba34a0
fix linter
itholic Sep 14, 2023
f66d824
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 14, 2023
21a7dfe
Remove circular import
itholic Sep 15, 2023
2323237
resolve conflicts
itholic Sep 15, 2023
0c07f55
resolve conflicts
itholic Sep 15, 2023
5c054ec
resolve conflicts
itholic Sep 16, 2023
1cb9df4
do not call applymap
itholic Sep 16, 2023
0e8ea3b
fix linter
itholic Sep 16, 2023
46cd7dd
Recommend to use Pandas 2.0.0 and above
itholic Sep 16, 2023
357fbce
fix linter
itholic Sep 18, 2023
76f0720
resolve conflicts
itholic Sep 18, 2023
cf54c67
resolve conflicts
itholic Sep 18, 2023
0008089
Import
itholic Sep 18, 2023
5723b6c
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht
# See more in SPARK-39735
ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"

RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
RUN pypy3 -m pip install numpy 'pandas<=2.1.0' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.0' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'

# Add Python deps for Spark Connect.
RUN python3.9 -m pip install grpcio protobuf googleapis-common-protos grpcio-status
Expand Down
9 changes: 8 additions & 1 deletion python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,14 @@ def aggregate(
i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is not self._psdf
)
if len(should_drop_index) > 0:
psdf = psdf.reset_index(level=should_drop_index, drop=True)
drop = not any(
[
isinstance(func_or_funcs[gkey.name], list)
for gkey in self._groupkeys
if gkey.name in func_or_funcs
]
)
psdf = psdf.reset_index(level=should_drop_index, drop=drop)
Comment on lines -308 to +315
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug fixed in Pandas: pandas-dev/pandas#52849.

if len(should_drop_index) < len(self._groupkeys):
psdf = psdf.reset_index()

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/pandas/supported_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def generate_supported_api(output_rst_file_path: str) -> None:

Write supported APIs documentation.
"""
pandas_latest_version = "2.0.3"
pandas_latest_version = "2.1.0"
if LooseVersion(pd.__version__) != LooseVersion(pandas_latest_version):
msg = (
"Warning: Latest version of pandas (%s) is required to generate the documentation; "
Expand Down
5 changes: 1 addition & 4 deletions python/pyspark/pandas/tests/computation/test_corrwith.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ def _test_corrwith(self, psdf, psobj):
# Therefore, we only test the pandas 1.5.0 in different way.
# See https://github.com/pandas-dev/pandas/issues/48826 for the reported issue,
# and https://github.com/pandas-dev/pandas/pull/46174 for the initial PR that causes.
if LooseVersion(pd.__version__) == LooseVersion("1.5.0") and isinstance(pobj, pd.Series):
methods = ["kendall"]
else:
methods = ["pearson", "spearman", "kendall"]
methods = ["pearson", "spearman", "kendall"]
for method in methods:
for drop in [True, False]:
p_corr = pdf.corrwith(pobj, drop=drop, method=method)
Expand Down
5 changes: 3 additions & 2 deletions python/pyspark/pandas/tests/frame/test_reshaping.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ def test_stack(self):
psdf_multi_level_cols2 = ps.from_pandas(pdf_multi_level_cols2)

self.assert_eq(
psdf_multi_level_cols2.stack().sort_index(), pdf_multi_level_cols2.stack().sort_index()
psdf_multi_level_cols2.stack().sort_index()[["weight", "height"]],
pdf_multi_level_cols2.stack().sort_index()[["weight", "height"]],
)

pdf = pd.DataFrame(
Expand All @@ -304,7 +305,7 @@ def test_stack(self):
)
psdf = ps.from_pandas(pdf)

self.assert_eq(psdf.stack().sort_index(), pdf.stack().sort_index())
self.assert_eq(psdf.stack().sort_index()[["x", "y"]], pdf.stack().sort_index()[["x", "y"]])
self.assert_eq(psdf[[]].stack().sort_index(), pdf[[]].stack().sort_index(), almost=True)

def test_unstack(self):
Expand Down
31 changes: 24 additions & 7 deletions python/pyspark/pandas/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
import numpy as np
import pandas as pd

try:
from pandas._testing import makeMissingDataframe
except ImportError:
from pandas.util.testing import makeMissingDataframe

from pyspark import pandas as ps
from pyspark.pandas.config import option_context
from pyspark.testing.pandasutils import PandasOnSparkTestCase, SPARK_CONF_ARROW_ENABLED
Expand Down Expand Up @@ -273,7 +268,18 @@ def test_skew_kurt_numerical_stability(self):
self.assert_eq(psdf.kurt(), pdf.kurt(), almost=True)

def test_dataframe_corr(self):
pdf = makeMissingDataframe(0.3, 42)
pdf = pd.DataFrame(
index=[
"".join(
np.random.choice(
list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), 10
)
)
for _ in range(30)
],
columns=list("ABCD"),
dtype="float64",
)
Comment on lines -276 to +282
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The testing util makeMissingDataframe is removed.

psdf = ps.from_pandas(pdf)

with self.assertRaisesRegex(ValueError, "Invalid method"):
Expand Down Expand Up @@ -347,7 +353,18 @@ def test_dataframe_corr(self):
)

def test_series_corr(self):
pdf = makeMissingDataframe(0.3, 42)
pdf = pd.DataFrame(
index=[
"".join(
np.random.choice(
list("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"), 10
)
)
for _ in range(30)
],
columns=list("ABCD"),
dtype="float64",
)
pser1 = pdf.A
pser2 = pdf.B
psdf = ps.from_pandas(pdf)
Expand Down
15 changes: 9 additions & 6 deletions python/pyspark/pandas/typedef/typehints.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,23 +487,23 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp
... pass
>>> inferred = infer_return_type(func)
>>> inferred.dtypes
[dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)]
[dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added dtype of categories is added to __repr__: pandas-dev/pandas#52179.

>>> inferred.spark_type
StructType([StructField('c0', LongType(), True), StructField('c1', LongType(), True)])

>>> def func() -> ps.DataFrame[zip(pdf.columns, pdf.dtypes)]:
... pass
>>> inferred = infer_return_type(func)
>>> inferred.dtypes
[dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)]
[dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)]
>>> inferred.spark_type
StructType([StructField('a', LongType(), True), StructField('b', LongType(), True)])

>>> def func() -> ps.Series[pdf.b.dtype]:
... pass
>>> inferred = infer_return_type(func)
>>> inferred.dtype
CategoricalDtype(categories=[3, 4, 5], ordered=False)
CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)
>>> inferred.spark_type
LongType()

Expand All @@ -521,7 +521,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp
... pass
>>> inferred = infer_return_type(func)
>>> inferred.dtypes
[dtype('int64'), dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)]
[dtype('int64'), dtype('int64'),
CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)]
>>> inferred.spark_type.simpleString()
'struct<__index_level_0__:bigint,c0:bigint,c1:bigint>'
>>> inferred.index_fields
Expand All @@ -533,7 +534,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp
... pass
>>> inferred = infer_return_type(func)
>>> inferred.dtypes
[CategoricalDtype(categories=[3, 4, 5], ordered=False), dtype('int64'), dtype('int64')]
[CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64),
dtype('int64'), dtype('int64')]
>>> inferred.spark_type.simpleString()
'struct<index:bigint,id:bigint,A:bigint>'
>>> inferred.index_fields
Expand All @@ -544,7 +546,8 @@ def infer_return_type(f: Callable) -> Union[SeriesType, DataFrameType, ScalarTyp
... pass
>>> inferred = infer_return_type(func)
>>> inferred.dtypes
[dtype('int64'), dtype('int64'), CategoricalDtype(categories=[3, 4, 5], ordered=False)]
[dtype('int64'), dtype('int64'),
CategoricalDtype(categories=[3, 4, 5], ordered=False, categories_dtype=int64)]
>>> inferred.spark_type.simpleString()
'struct<__index_level_0__:bigint,a:bigint,b:bigint>'
>>> inferred.index_fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def check_apply_in_pandas_not_returning_pandas_dataframe(self):
fn=lambda lft, rgt: lft.size + rgt.size,
error_class=PythonException,
error_message_regex="Return type of the user-defined function "
"should be pandas.DataFrame, but is int64.",
"should be pandas.DataFrame, but is int.",
)

def test_apply_in_pandas_returning_column_names(self):
Expand Down