Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REF: move Block construction in groupby aggregation to internals #39997

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from functools import partial
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -25,7 +24,6 @@
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Union,
Expand Down Expand Up @@ -115,10 +113,6 @@

from pandas.plotting import boxplot_frame_groupby

if TYPE_CHECKING:
from pandas.core.internals import Block


NamedAgg = namedtuple("NamedAgg", ["column", "aggfunc"])
# TODO(typing) the return value on this callable should be any *scalar*.
AggScalar = Union[str, Callable[..., Any]]
Expand Down Expand Up @@ -1074,7 +1068,7 @@ def _cython_agg_general(
agg_mgr = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items)
return self._wrap_agged_manager(agg_mgr)

def _cython_agg_blocks(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
Expand Down Expand Up @@ -1174,7 +1168,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
# NotImplementedError -> "ohlc" with wrong dtype
new_mgr = data.apply(blk_func, ignore_failures=True)
new_mgr = data.grouped_reduce(blk_func, ignore_failures=True)

if not len(new_mgr):
raise DataError("No numeric types to aggregate")
Expand Down Expand Up @@ -1748,17 +1742,17 @@ def _wrap_transformed_output(

return result

def _wrap_agged_blocks(self, blocks: Sequence[Block], items: Index) -> DataFrame:
def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed this is an improvement

if not self.as_index:
index = np.arange(blocks[0].values.shape[-1])
mgr = BlockManager(blocks, axes=[items, index])
index = np.arange(mgr.shape[1])
mgr.axes[1] = ibase.Index(index)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is super janky, maybe can push this down somehow

result = self.obj._constructor(mgr)

self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
mgr = BlockManager(blocks, axes=[items, index])
mgr.axes[1] = index
result = self.obj._constructor(mgr)

if self.axis == 1:
Expand Down Expand Up @@ -1808,13 +1802,13 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1)
return counted

new_mgr = data.apply(hfunc)
new_mgr = data.grouped_reduce(hfunc)

# If we are grouping on categoricals we want unobserved categories to
# return zero, rather than the default of NaN which the reindexing in
# _wrap_agged_blocks() returns. GH 35028
# _wrap_agged_manager() returns. GH 35028
with com.temp_setattr(self, "observed", True):
result = self._wrap_agged_blocks(new_mgr.blocks, items=data.items)
result = self._wrap_agged_manager(new_mgr)

return self._reindex_output(result, fill_value=0)

Expand Down
35 changes: 35 additions & 0 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,41 @@ def reduce(
new_mgr = type(self).from_blocks(res_blocks, [self.items, index])
return new_mgr, indexer

def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T:
"""
Apply grouped reduction function blockwise, returning a new BlockManager.

Parameters
----------
func : grouped reduction function
ignore_failures : bool, default False
Whether to drop blocks where func raises TypeError.

Returns
-------
BlockManager
"""
result_blocks: List[Block] = []

for blk in self.blocks:
try:
applied = blk.apply(func)
except (TypeError, NotImplementedError):
if not ignore_failures:
raise
continue
result_blocks = extend_blocks(applied, result_blocks)

if len(result_blocks) == 0:
index = Index([None]) # placeholder
else:
index = Index(range(result_blocks[0].values.shape[-1]))

if ignore_failures:
return self._combine(result_blocks, index=index)

return type(self).from_blocks(result_blocks, [self.axes[0], index])

def operate_blockwise(self, other: BlockManager, array_op) -> BlockManager:
"""
Apply array_op blockwise with another (aligned) BlockManager.
Expand Down