Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ArrayManager] GroupBy cython aggregations (no fallback) #39885

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ jobs:
pytest pandas/tests/reductions/ --array-manager
pytest pandas/tests/generic/test_generic.py --array-manager
pytest pandas/tests/arithmetic/ --array-manager
pytest pandas/tests/groupby/aggregate/ --array-manager
pytest pandas/tests/reshape/merge --array-manager

# indexing subset (temporary since other tests don't pass yet)
Expand Down
40 changes: 25 additions & 15 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
ArrayLike,
FrameOrSeries,
FrameOrSeriesUnion,
Manager,
)
from pandas.util._decorators import (
Appender,
Expand Down Expand Up @@ -107,7 +108,10 @@
all_indexes_same,
)
import pandas.core.indexes.base as ibase
from pandas.core.internals import BlockManager
from pandas.core.internals import (
ArrayManager,
BlockManager,
)
from pandas.core.series import Series
from pandas.core.util.numba_ import maybe_use_numba

Expand Down Expand Up @@ -1074,20 +1078,22 @@ def _iterate_slices(self) -> Iterable[Series]:
def _cython_agg_general(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
) -> DataFrame:
agg_mgr = self._cython_agg_blocks(
agg_mgr = self._cython_agg_manager(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_manager(agg_mgr)

def _cython_agg_blocks(
def _cython_agg_manager(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
) -> BlockManager:
) -> Manager:

data: BlockManager = self._get_data_to_aggregate()
data: Manager = self._get_data_to_aggregate()

if numeric_only:
data = data.get_numeric_data(copy=False)

using_array_manager = isinstance(data, ArrayManager)

def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike:
# see if we can cast the values to the desired dtype
# this may not be the original dtype
Expand All @@ -1101,7 +1107,11 @@ def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike:
result = type(values)._from_sequence(result.ravel(), dtype=values.dtype)
# Note this will have result.dtype == dtype from above

elif isinstance(result, np.ndarray) and result.ndim == 1:
elif (
not using_array_manager
and isinstance(result, np.ndarray)
and result.ndim == 1
):
# We went through a SeriesGroupByPath and need to reshape
# GH#32223 includes case with IntegerArray values
result = result.reshape(1, -1)
Expand Down Expand Up @@ -1153,11 +1163,11 @@ def py_fallback(bvalues: ArrayLike) -> ArrayLike:
result = mgr.blocks[0].values
return result

def blk_func(bvalues: ArrayLike) -> ArrayLike:
def array_func(values: ArrayLike) -> ArrayLike:

try:
result = self.grouper._cython_operation(
"aggregate", bvalues, how, axis=1, min_count=min_count
"aggregate", values, how, axis=1, min_count=min_count
)
except NotImplementedError:
# generally if we have numeric_only=False
Expand All @@ -1170,14 +1180,14 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
assert how == "ohlc"
raise

result = py_fallback(bvalues)
result = py_fallback(values)

return cast_agg_result(result, bvalues, how)
return cast_agg_result(result, values, how)

# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
# NotImplementedError -> "ohlc" with wrong dtype
new_mgr = data.grouped_reduce(blk_func, ignore_failures=True)
new_mgr = data.grouped_reduce(array_func, ignore_failures=True)

if not len(new_mgr):
raise DataError("No numeric types to aggregate")
Expand Down Expand Up @@ -1670,7 +1680,7 @@ def _wrap_frame_output(self, result, obj: DataFrame) -> DataFrame:
else:
return self.obj._constructor(result, index=obj.index, columns=result_index)

def _get_data_to_aggregate(self) -> BlockManager:
def _get_data_to_aggregate(self) -> Manager:
obj = self._obj_with_exclusions
if self.axis == 1:
return obj.T._mgr
Expand Down Expand Up @@ -1755,17 +1765,17 @@ def _wrap_transformed_output(

return result

def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame:
def _wrap_agged_manager(self, mgr: Manager) -> DataFrame:
if not self.as_index:
index = np.arange(mgr.shape[1])
mgr.axes[1] = ibase.Index(index)
mgr.set_axis(1, ibase.Index(index), verify_integrity=False)
result = self.obj._constructor(mgr)

self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
mgr.axes[1] = index
mgr.set_axis(1, index, verify_integrity=False)
result = self.obj._constructor(mgr)

if self.axis == 1:
Expand Down
46 changes: 36 additions & 10 deletions pandas/core/internals/array_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,20 @@ def _normalize_axis(axis):
axis = 1 if axis == 0 else 0
return axis

# TODO can be shared
def set_axis(self, axis: int, new_labels: Index) -> None:
def set_axis(
self, axis: int, new_labels: Index, verify_integrity: bool = True
) -> None:
# Caller is responsible for ensuring we have an Index object.
axis = self._normalize_axis(axis)
old_len = len(self._axes[axis])
new_len = len(new_labels)
if verify_integrity:
old_len = len(self._axes[axis])
new_len = len(new_labels)

if new_len != old_len:
raise ValueError(
f"Length mismatch: Expected axis has {old_len} elements, new "
f"values have {new_len} elements"
)
if new_len != old_len:
raise ValueError(
f"Length mismatch: Expected axis has {old_len} elements, new "
f"values have {new_len} elements"
)

self._axes[axis] = new_labels

Expand Down Expand Up @@ -254,6 +256,30 @@ def reduce(
new_mgr = type(self)(result_arrays, [index, columns])
return new_mgr, indexer

def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T:
"""
Apply grouped reduction function columnwise, returning a new ArrayManager.

Parameters
----------
func : grouped reduction function
ignore_failures : bool, default False
Whether to drop columns where func raises TypeError.

Returns
-------
ArrayManager
"""
# TODO ignore_failures
result_arrays = [func(arr) for arr in self.arrays]

if len(result_arrays) == 0:
index = Index([None]) # placeholder
else:
index = Index(range(result_arrays[0].shape[0]))

return type(self)(result_arrays, [index, self.items])

def operate_blockwise(self, other: ArrayManager, array_op) -> ArrayManager:
"""
Apply array_op blockwise with another (aligned) BlockManager.
Expand Down Expand Up @@ -369,7 +395,7 @@ def apply_with_block(self: T, f, align_keys=None, **kwargs) -> T:
if hasattr(arr, "tz") and arr.tz is None: # type: ignore[union-attr]
# DatetimeArray needs to be converted to ndarray for DatetimeBlock
arr = arr._data # type: ignore[union-attr]
elif arr.dtype.kind == "m":
elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could combine this with previous check as

if arr.dtype.kind in ["m", "M"] and not isinstance(arr, np.ndarray):
    arr = arr._data

Copy link
Member Author

@jorisvandenbossche jorisvandenbossche Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice, but the problem is that we still need to keep DatetimeArray intact for DatetimeTZBlock. So we would still need the if hasattr(arr, "tz") and arr.tz is None check as well, in which case it doesn't necessarily become more readable to combine both checks.

Edit: the diff would be:

-            if hasattr(arr, "tz") and arr.tz is None:  # type: ignore[union-attr]
-                # DatetimeArray needs to be converted to ndarray for DatetimeBlock
-                arr = arr._data  # type: ignore[union-attr]
-            elif arr.dtype.kind == "m" and not isinstance(arr, np.ndarray):
-                # TimedeltaArray needs to be converted to ndarray for TimedeltaBlock
+            if (
+                arr.dtype.kind == "m"
+                and not isinstance(arr, np.ndarray)
+                and getattr(arr, "tz", None) is None
+            ):
+                # DatetimeArray/TimedeltaArray needs to be converted to ndarray
+                # for DatetimeBlock/TimedeltaBlock (except DatetimeArray with tz,
+                # which needs to be preserved for DatetimeTZBlock)
                 arr = arr._data  # type: ignore[union-attr]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of and getattr(arr, "tz", None) is None how about isinstance(arr.dtype, np.dtype). either way works i guess

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That still gives the same length of the if check as in my diff example above, which I don't find an improvement in readability

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah the only possible difference is for mypy

# TimedeltaArray needs to be converted to ndarray for TimedeltaBlock
arr = arr._data # type: ignore[union-attr]
if isinstance(arr, np.ndarray):
Expand Down
24 changes: 13 additions & 11 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,19 @@ def shape(self) -> Shape:
def ndim(self) -> int:
return len(self.axes)

def set_axis(self, axis: int, new_labels: Index) -> None:
def set_axis(
self, axis: int, new_labels: Index, verify_integrity: bool = True
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a verify_integrity keyword here, and put the length verification behind if verify_integrity. Reason I needed this is because in groupby, we are sometimes setting an Index with a different length as the original one.

) -> None:
# Caller is responsible for ensuring we have an Index object.
old_len = len(self.axes[axis])
new_len = len(new_labels)
if verify_integrity:
old_len = len(self.axes[axis])
new_len = len(new_labels)

if new_len != old_len:
raise ValueError(
f"Length mismatch: Expected axis has {old_len} elements, new "
f"values have {new_len} elements"
)
if new_len != old_len:
raise ValueError(
f"Length mismatch: Expected axis has {old_len} elements, new "
f"values have {new_len} elements"
)

self.axes[axis] = new_labels

Expand Down Expand Up @@ -282,16 +285,15 @@ def get_dtypes(self):
return algos.take_nd(dtypes, self.blknos, allow_fill=False)

@property
def arrays(self):
def arrays(self) -> List[ArrayLike]:
"""
Quick access to the backing arrays of the Blocks.

Only for compatibility with ArrayManager for testing convenience.
Not to be used in actual code, and return value is not the same as the
ArrayManager method (list of 1D arrays vs iterator of 2D ndarrays / 1D EAs).
"""
for blk in self.blocks:
yield blk.values
return [blk.values for blk in self.blocks]

def __getstate__(self):
block_values = [b.values for b in self.blocks]
Expand Down
7 changes: 7 additions & 0 deletions pandas/tests/groupby/aggregate/test_aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pytest

from pandas.errors import PerformanceWarning
import pandas.util._test_decorators as td

from pandas.core.dtypes.common import is_integer_dtype

Expand Down Expand Up @@ -45,6 +46,7 @@ def test_agg_regression1(tsframe):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) quantile/describe
def test_agg_must_agg(df):
grouped = df.groupby("A")["C"]

Expand Down Expand Up @@ -134,6 +136,7 @@ def test_groupby_aggregation_multi_level_column():
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) non-cython agg
def test_agg_apply_corner(ts, tsframe):
# nothing to group, all NA
grouped = ts.groupby(ts * np.nan)
Expand Down Expand Up @@ -212,6 +215,7 @@ def test_aggregate_str_func(tsframe, groupbyfunc):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) non-cython agg
def test_agg_str_with_kwarg_axis_1_raises(df, reduction_func):
gb = df.groupby(level=0)
if reduction_func in ("idxmax", "idxmin"):
Expand Down Expand Up @@ -491,6 +495,7 @@ def test_agg_index_has_complex_internals(index):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback
def test_agg_split_block():
# https://github.com/pandas-dev/pandas/issues/31522
df = DataFrame(
Expand All @@ -508,6 +513,7 @@ def test_agg_split_block():
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback
def test_agg_split_object_part_datetime():
# https://github.com/pandas-dev/pandas/pull/31616
df = DataFrame(
Expand Down Expand Up @@ -1199,6 +1205,7 @@ def test_aggregate_datetime_objects():
tm.assert_series_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) agg py_fallback
def test_aggregate_numeric_object_dtype():
# https://github.com/pandas-dev/pandas/issues/39329
# simplified case: multiple object columns where one is all-NaN
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/aggregate/test_cython.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def test_read_only_buffer_source_agg(agg):
"species": ["setosa", "setosa", "setosa", "setosa", "setosa"],
}
)
df._mgr.blocks[0].values.flags.writeable = False
df._mgr.arrays[0].flags.writeable = False

result = df.groupby(["species"]).agg({"sepal_length": agg})
expected = df.copy().groupby(["species"]).agg({"sepal_length": agg})
Expand Down
3 changes: 3 additions & 0 deletions pandas/tests/groupby/aggregate/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import numpy as np
import pytest

import pandas.util._test_decorators as td

import pandas as pd
from pandas import (
DataFrame,
Expand Down Expand Up @@ -412,6 +414,7 @@ def __call__(self, x):
tm.assert_frame_equal(result, expected)


@td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) columns with ndarrays
def test_agg_over_numpy_arrays():
# GH 3788
df = DataFrame(
Expand Down