Skip to content

Commit

Permalink
REF: move Block construction in groupby aggregation to internals (#39997
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jorisvandenbossche authored Feb 24, 2021
1 parent c545701 commit 408216c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
24 changes: 9 additions & 15 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from functools import partial
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -25,7 +24,6 @@
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Union,
Expand Down Expand Up @@ -115,10 +113,6 @@

from pandas.plotting import boxplot_frame_groupby

if TYPE_CHECKING:
from pandas.core.internals import Block


NamedAgg = namedtuple("NamedAgg", ["column", "aggfunc"])
# TODO(typing) the return value on this callable should be any *scalar*.
AggScalar = Union[str, Callable[..., Any]]
Expand Down Expand Up @@ -1083,7 +1077,7 @@ def _cython_agg_general(
agg_mgr = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items)
return self._wrap_agged_manager(agg_mgr)

def _cython_agg_blocks(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
Expand Down Expand Up @@ -1183,7 +1177,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
# NotImplementedError -> "ohlc" with wrong dtype
new_mgr = data.apply(blk_func, ignore_failures=True)
new_mgr = data.grouped_reduce(blk_func, ignore_failures=True)

if not len(new_mgr):
raise DataError("No numeric types to aggregate")
Expand Down Expand Up @@ -1761,17 +1755,17 @@ def _wrap_transformed_output(

return result

def _wrap_agged_blocks(self, blocks: Sequence[Block], items: Index) -> DataFrame:
def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame:
if not self.as_index:
index = np.arange(blocks[0].values.shape[-1])
mgr = BlockManager(blocks, axes=[items, index])
index = np.arange(mgr.shape[1])
mgr.axes[1] = ibase.Index(index)
result = self.obj._constructor(mgr)

self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
mgr = BlockManager(blocks, axes=[items, index])
mgr.axes[1] = index
result = self.obj._constructor(mgr)

if self.axis == 1:
Expand Down Expand Up @@ -1821,13 +1815,13 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1)
return counted

new_mgr = data.apply(hfunc)
new_mgr = data.grouped_reduce(hfunc)

# If we are grouping on categoricals we want unobserved categories to
# return zero, rather than the default of NaN which the reindexing in
# _wrap_agged_blocks() returns. GH 35028
# _wrap_agged_manager() returns. GH 35028
with com.temp_setattr(self, "observed", True):
result = self._wrap_agged_blocks(new_mgr.blocks, items=data.items)
result = self._wrap_agged_manager(new_mgr)

return self._reindex_output(result, fill_value=0)

Expand Down
35 changes: 35 additions & 0 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,41 @@ def reduce(
new_mgr = type(self).from_blocks(res_blocks, [self.items, index])
return new_mgr, indexer

def grouped_reduce(self: T, func: Callable, ignore_failures: bool = False) -> T:
"""
Apply grouped reduction function blockwise, returning a new BlockManager.
Parameters
----------
func : grouped reduction function
ignore_failures : bool, default False
Whether to drop blocks where func raises TypeError.
Returns
-------
BlockManager
"""
result_blocks: List[Block] = []

for blk in self.blocks:
try:
applied = blk.apply(func)
except (TypeError, NotImplementedError):
if not ignore_failures:
raise
continue
result_blocks = extend_blocks(applied, result_blocks)

if len(result_blocks) == 0:
index = Index([None]) # placeholder
else:
index = Index(range(result_blocks[0].values.shape[-1]))

if ignore_failures:
return self._combine(result_blocks, index=index)

return type(self).from_blocks(result_blocks, [self.axes[0], index])

def operate_blockwise(self, other: BlockManager, array_op) -> BlockManager:
"""
Apply array_op blockwise with another (aligned) BlockManager.
Expand Down

0 comments on commit 408216c

Please sign in to comment.