Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45065][PYTHON][PS] Support Pandas 2.1.0 #42793

Closed
wants to merge 33 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bf79e7a
[SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
itholic Sep 4, 2023
e81a97a
Fix tests
itholic Sep 5, 2023
246d2a0
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 6, 2023
f874b85
Respect as_index=False when given funcs is a type of list
itholic Sep 6, 2023
49c5c5d
Apply the Pandas 2.1.0 changes
itholic Sep 7, 2023
7184a3b
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 7, 2023
15c5aa7
Fix ordering for stack
itholic Sep 8, 2023
5dbf456
Added migration guide
itholic Sep 8, 2023
2a17d1d
Deprecate all features from Pandas 2.1.0.
itholic Sep 11, 2023
f48215c
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 11, 2023
dc68af1
Fix linter
itholic Sep 12, 2023
4e89cf5
fix test
itholic Sep 12, 2023
a585fbe
Fix linter
itholic Sep 13, 2023
1831923
fix
zhengruifeng Sep 13, 2023
91b865c
resolve conflicts
itholic Sep 13, 2023
76433d0
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 13, 2023
afecab9
Retrigger the CI
itholic Sep 14, 2023
6ff4df2
replace the import
itholic Sep 14, 2023
5a0fe26
revert unnecess change
itholic Sep 14, 2023
bba34a0
fix linter
itholic Sep 14, 2023
f66d824
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 14, 2023
21a7dfe
Remove circular import
itholic Sep 15, 2023
2323237
resolve conflicts
itholic Sep 15, 2023
0c07f55
resolve conflicts
itholic Sep 15, 2023
5c054ec
resolve conflicts
itholic Sep 16, 2023
1cb9df4
do not call applymap
itholic Sep 16, 2023
0e8ea3b
fix linter
itholic Sep 16, 2023
46cd7dd
Recommend to use Pandas 2.0.0 and above
itholic Sep 16, 2023
357fbce
fix linter
itholic Sep 18, 2023
76f0720
resolve conflicts
itholic Sep 18, 2023
cf54c67
resolve conflicts
itholic Sep 18, 2023
0008089
Import
itholic Sep 18, 2023
5723b6c
Merge branch 'master' of https://github.com/apache/spark into pandas_…
itholic Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/infra/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ RUN Rscript -e "devtools::install_version('roxygen2', version='7.2.0', repos='ht
# See more in SPARK-39735
ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"

RUN pypy3 -m pip install numpy 'pandas<=2.0.3' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.0.3' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
RUN pypy3 -m pip install numpy 'pandas<=2.1.0' scipy coverage matplotlib
RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.0' scipy unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 'memory-profiler==0.60.0' 'scikit-learn==1.1.*'

# Add Python deps for Spark Connect.
RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 'protobuf==3.20.3' 'googleapis-common-protos==1.56.4'
Expand Down
3 changes: 3 additions & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Upgrading PySpark
Upgrading from PySpark 3.5 to 4.0
---------------------------------

* In Spark 4.0, it is recommended to use Pandas version 2.0.0 or above with PySpark for optimal compatibility.
* In Spark 4.0, the minimum supported version for Pandas has been raised from 1.0.5 to 1.4.4 in PySpark.
* In Spark 4.0, the minimum supported version for Numpy has been raised from 1.15 to 1.21 in PySpark.
* In Spark 4.0, ``Int64Index`` and ``Float64Index`` have been removed from pandas API on Spark, ``Index`` should be used directly.
Expand All @@ -44,6 +45,8 @@ Upgrading from PySpark 3.5 to 4.0
* In Spark 4.0, ``squeeze`` parameter from ``ps.read_csv`` and ``ps.read_excel`` has been removed from pandas API on Spark.
* In Spark 4.0, ``null_counts`` parameter from ``DataFrame.info`` has been removed from pandas API on Spark, use ``show_counts`` instead.
* In Spark 4.0, the result of ``MultiIndex.append`` does not keep the index names from pandas API on Spark.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a line her, where we tell users to have pandas version 2.1.0 installed for spark 4.0
The only way now to find witch pandas version to install is to check the docker file in dev/infra

https://github.com/jupyter/docker-stacks/blob/52a999a554fe42951e017f7be132d808695a1261/images/pyspark-notebook/Dockerfile#L69

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Related information has been added to the top of the migration guide. Thanks!

* In Spark 4.0, ``DataFrameGroupBy.agg`` with lists respecting ``as_index=False`` from pandas API on Spark.
* In Spark 4.0, ``DataFrame.stack`` guarantees the order of existing columns instead of sorting them lexicographically from pandas API on Spark.
* In Spark 4.0, ``True`` or ``False`` to ``inclusive`` parameter from ``Series.between`` has been removed from pandas API on Spark, use ``both`` or ``neither`` instead respectively.
* In Spark 4.0, ``Index.asi8`` has been removed from pandas API on Spark, use ``Index.astype`` instead.
* In Spark 4.0, ``Index.is_type_compatible`` has been removed from pandas API on Spark, use ``Index.isin`` instead.
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.pandas/frame.rst
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Function application, GroupBy & Window

DataFrame.apply
DataFrame.applymap
DataFrame.map
DataFrame.pipe
DataFrame.agg
DataFrame.aggregate
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,6 @@ def isnull(self: IndexOpsLike) -> IndexOpsLike:
NA values, such as None or numpy.NaN, get mapped to True values.
Everything else gets mapped to False values. Characters such as empty strings '' or
numpy.inf are not considered NA values
(unless you set pandas.options.mode.use_inf_as_na = True).

Returns
-------
Expand Down Expand Up @@ -1012,7 +1011,6 @@ def notnull(self: IndexOpsLike) -> IndexOpsLike:
Return a boolean same-sized object indicating if the values are not NA.
Non-missing values get mapped to True.
Characters such as empty strings '' or numpy.inf are not considered NA values
(unless you set pandas.options.mode.use_inf_as_na = True).
NA values, such as None or numpy.NaN, get mapped to False values.

Returns
Expand Down
143 changes: 135 additions & 8 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,8 @@ def applymap(self, func: Callable[[Any], Any]) -> "DataFrame":
This method applies a function that accepts and returns a scalar
to every element of a DataFrame.

.. deprecated:: 4.0.0

.. note:: this API executes the function once to infer the type which is
potentially expensive, for instance, when the dataset is created after
aggregations or sorting.
Expand Down Expand Up @@ -1321,7 +1323,74 @@ def applymap(self, func: Callable[[Any], Any]) -> "DataFrame":
0 1.000000 4.494400
1 11.262736 20.857489
"""
warnings.warn(
"DataFrame.applymap has been deprecated. Use DataFrame.map instead", FutureWarning
)

# TODO: We can implement shortcut theoretically since it creates new DataFrame
# anyway and we don't have to worry about operations on different DataFrames.
return self.map(func=func)

def map(self, func: Callable[[Any], Any]) -> "DataFrame":
"""
Apply a function to a Dataframe elementwise.

This method applies a function that accepts and returns a scalar
to every element of a DataFrame.

.. versionadded:: 4.0.0
DataFrame.applymap was deprecated and renamed to DataFrame.map.

.. note:: this API executes the function once to infer the type which is
potentially expensive, for instance, when the dataset is created after
aggregations or sorting.

To avoid this, specify return type in ``func``, for instance, as below:

>>> def square(x) -> np.int32:
... return x ** 2

pandas-on-Spark uses return type hints and does not try to infer the type.

Parameters
----------
func : callable
Python function returns a single value from a single value.

Returns
-------
DataFrame
Transformed DataFrame.

Examples
--------
>>> df = ps.DataFrame([[1, 2.12], [3.356, 4.567]])
>>> df
0 1
0 1.000 2.120
1 3.356 4.567

>>> def str_len(x) -> int:
... return len(str(x))
>>> df.map(str_len)
0 1
0 3 4
1 5 5

>>> def power(x) -> float:
... return x ** 2
>>> df.map(power)
0 1
0 1.000000 4.494400
1 11.262736 20.857489

You can omit type hints and let pandas-on-Spark infer its type.

>>> df.map(lambda x: x ** 2)
0 1
0 1.000000 4.494400
1 11.262736 20.857489
"""
# TODO: We can implement shortcut theoretically since it creates new DataFrame
# anyway and we don't have to worry about operations on different DataFrames.
return self._apply_series_op(lambda psser: psser.apply(func))
Expand Down Expand Up @@ -5556,6 +5625,10 @@ def from_records(
Parameters
----------
data : ndarray (structured dtype), list of tuples, dict, or DataFrame

.. deprecated:: 4.0.0
Passing a DataFrame is deprecated.

index : string, list of fields, array-like
Field of array to use as the index, alternately a specific set of input labels to use
exclude : sequence, default None
Expand Down Expand Up @@ -5952,6 +6025,9 @@ def fillna(
Method to use for filling holes in reindexed Series pad / ffill: propagate last valid
observation forward to next valid backfill / bfill:
use NEXT valid observation to fill gap

.. deprecated:: 4.0.0

axis : {0 or `index`}
1 and `columns` are not supported.
inplace : boolean, default False
Expand All @@ -5963,6 +6039,8 @@ def fillna(
this is the maximum number of entries along the entire axis where NaNs will be filled.
Must be greater than 0 if not None

.. deprecated:: 4.0.0

Returns
-------
DataFrame
Expand Down Expand Up @@ -6046,6 +6124,11 @@ def op(psser: ps.Series) -> ps.Series:
return psser._fillna(value=value, method=method, axis=axis, limit=limit)

elif method is not None:
warnings.warn(
"DataFrame.fillna with 'method' is deprecated and will raise in a future version. "
"Use DataFrame.ffill() or DataFrame.bfill() instead.",
FutureWarning,
)

def op(psser: ps.Series) -> ps.Series:
return psser._fillna(value=value, method=method, axis=axis, limit=limit)
Expand Down Expand Up @@ -6121,6 +6204,21 @@ def replace(
If value is a list or tuple, value should be of the same length with to_replace.
inplace : boolean, default False
Fill in place (do not create a new object)
limit : int, default None
Maximum size gap to forward or backward fill.

.. deprecated:: 4.0.0

regex : bool or str, default False
Whether to interpret to_replace and/or value as regular expressions.
If this is True then to_replace must be a string.
Alternatively, this could be a regular expression in which case to_replace must be None.
method : 'pad', default None
The method to use when for replacement, when to_replace is a scalar,
list or tuple and value is None.

.. deprecated:: 4.0.0


Returns
-------
Expand Down Expand Up @@ -6189,8 +6287,18 @@ def replace(
3 Hulk Smash
"""
if method != "pad":
warnings.warn(
"The 'method' keyword in DataFrame.replace is deprecated "
"and will be removed in a future version.",
FutureWarning,
)
raise NotImplementedError("replace currently works only for method='pad")
if limit is not None:
warnings.warn(
"The 'limit' keyword in DataFrame.replace is deprecated "
"and will be removed in a future version.",
FutureWarning,
)
raise NotImplementedError("replace currently works only when limit=None")
if regex is not False:
raise NotImplementedError("replace currently doesn't supports regex")
Expand Down Expand Up @@ -6221,6 +6329,13 @@ def op(psser: ps.Series) -> ps.Series:
return psser

else:
if value is None:
warnings.warn(
"DataFrame.replace without 'value' and with non-dict-like 'to_replace' "
"is deprecated and will raise in a future version. "
"Explicitly specify the new values instead.",
FutureWarning,
)

def op(psser: ps.Series) -> ps.Series:
return psser.replace(to_replace=to_replace, value=value, regex=regex)
Expand Down Expand Up @@ -6344,6 +6459,8 @@ def last(self, offset: Union[str, DateOffset]) -> "DataFrame":
When having a DataFrame with dates as index, this function can
select the last few rows based on a date offset.

.. deprecated:: 4.0.0

Parameters
----------
offset : str or DateOffset
Expand Down Expand Up @@ -6383,6 +6500,11 @@ def last(self, offset: Union[str, DateOffset]) -> "DataFrame":
3 observed days in the dataset, and therefore data for 2018-04-11 was
not returned.
"""
warnings.warn(
"last is deprecated and will be removed in a future version. "
"Please create a mask and filter using `.loc` instead",
FutureWarning,
)
# Check index type should be format DateTime
if not isinstance(self.index, ps.DatetimeIndex):
raise TypeError("'last' only supports a DatetimeIndex")
Expand All @@ -6401,6 +6523,8 @@ def first(self, offset: Union[str, DateOffset]) -> "DataFrame":
When having a DataFrame with dates as index, this function can
select the first few rows based on a date offset.

.. deprecated:: 4.0.0

Parameters
----------
offset : str or DateOffset
Expand Down Expand Up @@ -6440,6 +6564,11 @@ def first(self, offset: Union[str, DateOffset]) -> "DataFrame":
3 observed days in the dataset, and therefore data for 2018-04-13 was
not returned.
"""
warnings.warn(
"first is deprecated and will be removed in a future version. "
"Please create a mask and filter using `.loc` instead",
FutureWarning,
)
# Check index type should be format DatetimeIndex
if not isinstance(self.index, ps.DatetimeIndex):
raise TypeError("'first' only supports a DatetimeIndex")
Expand Down Expand Up @@ -10527,12 +10656,12 @@ def stack(self) -> DataFrameOrSeries:
kg m
cat 1.0 2.0
dog 3.0 4.0
>>> df_multi_level_cols2.stack().sort_index() # doctest: +SKIP
height weight
cat kg NaN 1.0
m 2.0 NaN
dog kg NaN 3.0
m 4.0 NaN
>>> df_multi_level_cols2.stack().sort_index()
Copy link
Contributor Author

@itholic itholic Sep 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column ordering bug is fixed in Pandas: pandas-dev/pandas#53786.

weight height
cat kg 1.0 NaN
m NaN 2.0
dog kg 3.0 NaN
m NaN 4.0
"""
from pyspark.pandas.series import first_series

Expand All @@ -10558,8 +10687,6 @@ def stack(self) -> DataFrameOrSeries:

index_values.add(value)

column_labels = dict(sorted(column_labels.items(), key=lambda x: x[0]))

index_name = self._internal.column_label_names[-1]
column_label_names = self._internal.column_label_names[:-1]
if len(column_label_names) == 0:
Expand Down
42 changes: 42 additions & 0 deletions python/pyspark/pandas/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,13 @@ def sum(
>>> df['b'].sum(min_count=3)
nan
"""
if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.sum with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -1418,6 +1425,13 @@ def product(
>>> ps.Series([]).prod(min_count=1) # doctest: +SKIP
nan
"""
if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.product with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -1870,6 +1884,13 @@ def std(
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")

if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.std with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -1962,6 +1983,13 @@ def var(
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")

if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.var with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -2191,6 +2219,13 @@ def sem(
if not isinstance(ddof, int):
raise TypeError("ddof must be integer")

if axis is None and isinstance(self, ps.DataFrame):
warnings.warn(
"The behavior of DataFrame.sem with axis=None is deprecated, "
"in a future version this will reduce over both axes and return a scalar. "
"To retain the old behavior, pass axis=0 (or do not pass axis)",
FutureWarning,
)
axis = validate_axis(axis)

if numeric_only is None and axis == 0:
Expand Down Expand Up @@ -2448,6 +2483,8 @@ def bool(self) -> bool:
This must be a boolean scalar value, either True or False. Raise a ValueError if
the object does not have exactly 1 element, or that element is not boolean

.. deprecated:: 4.0.0

Returns
-------
bool
Expand Down Expand Up @@ -2479,6 +2516,11 @@ def bool(self) -> bool:
...
ValueError: bool cannot act on a non-boolean single element DataFrame
"""
warnings.warn(
f"{self.__class__.__name__}.bool is now deprecated "
"and will be removed in future version.",
FutureWarning,
)
if isinstance(self, ps.DataFrame):
df = self
elif isinstance(self, ps.Series):
Expand Down
Loading