Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REF: move Block construction in groupby aggregation to internals #39997

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from functools import partial
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -25,7 +24,6 @@
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Union,
Expand Down Expand Up @@ -86,10 +84,7 @@
Categorical,
ExtensionArray,
)
from pandas.core.base import (
DataError,
SpecificationError,
)
from pandas.core.base import SpecificationError
import pandas.core.common as com
from pandas.core.construction import create_series_with_explicit_dtype
from pandas.core.frame import DataFrame
Expand All @@ -115,10 +110,6 @@

from pandas.plotting import boxplot_frame_groupby

if TYPE_CHECKING:
from pandas.core.internals import Block


NamedAgg = namedtuple("NamedAgg", ["column", "aggfunc"])
# TODO(typing) the return value on this callable should be any *scalar*.
AggScalar = Union[str, Callable[..., Any]]
Expand Down Expand Up @@ -1074,7 +1065,7 @@ def _cython_agg_general(
agg_mgr = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count
)
return self._wrap_agged_blocks(agg_mgr.blocks, items=agg_mgr.items)
return self._wrap_agged_manager(agg_mgr)

def _cython_agg_blocks(
self, how: str, alt=None, numeric_only: bool = True, min_count: int = -1
Expand Down Expand Up @@ -1174,11 +1165,7 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
# NotImplementedError -> "ohlc" with wrong dtype
new_mgr = data.apply(blk_func, ignore_failures=True)

if not len(new_mgr):
raise DataError("No numeric types to aggregate")

new_mgr = data.grouped_reduce(blk_func, ignore_failures=True)
return new_mgr

def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame:
Expand Down Expand Up @@ -1748,17 +1735,17 @@ def _wrap_transformed_output(

return result

def _wrap_agged_blocks(self, blocks: Sequence[Block], items: Index) -> DataFrame:
def _wrap_agged_manager(self, mgr: BlockManager) -> DataFrame:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed this is an improvement

if not self.as_index:
index = np.arange(blocks[0].values.shape[-1])
mgr = BlockManager(blocks, axes=[items, index])
index = np.arange(mgr.shape[1])
mgr.set_axis(1, ibase.Index(index))
result = self.obj._constructor(mgr)

self._insert_inaxis_grouper_inplace(result)
result = result._consolidate()
else:
index = self.grouper.result_index
mgr = BlockManager(blocks, axes=[items, index])
mgr.set_axis(1, index)
result = self.obj._constructor(mgr)

if self.axis == 1:
Expand Down Expand Up @@ -1808,13 +1795,13 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
counted = lib.count_level_2d(masked, labels=ids, max_bin=ngroups, axis=1)
return counted

new_mgr = data.apply(hfunc)
new_mgr = data.grouped_reduce(hfunc)

# If we are grouping on categoricals we want unobserved categories to
# return zero, rather than the default of NaN which the reindexing in
# _wrap_agged_blocks() returns. GH 35028
# _wrap_agged_manager() returns. GH 35028
with com.temp_setattr(self, "observed", True):
result = self._wrap_agged_blocks(new_mgr.blocks, items=data.items)
result = self._wrap_agged_manager(new_mgr)

return self._reindex_output(result, fill_value=0)

Expand Down
38 changes: 38 additions & 0 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@

import pandas.core.algorithms as algos
from pandas.core.arrays.sparse import SparseDtype
from pandas.core.base import DataError
from pandas.core.construction import extract_array
from pandas.core.indexers import maybe_convert_indices
from pandas.core.indexes.api import (
Expand Down Expand Up @@ -403,6 +404,43 @@ def reduce(
new_mgr = type(self).from_blocks(res_blocks, [self.items, index])
return new_mgr, indexer

def grouped_reduce(
self: T, func: Callable, ignore_failures: bool = False
) -> Tuple[T, np.ndarray]:
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved
"""
Apply grouped reduction function blockwise, returning a new BlockManager.

Parameters
----------
func : grouped reduction function
ignore_failures : bool, default False
Whether to drop blocks where func raises TypeError.

Returns
-------
BlockManager
"""
result_blocks: List[Block] = []

for blk in self.blocks:
try:
applied = blk.apply(func)
except (TypeError, NotImplementedError):
if not ignore_failures:
raise
continue
result_blocks = extend_blocks(applied, result_blocks)

if len(result_blocks) == 0:
raise DataError("No numeric types to aggregate")
jorisvandenbossche marked this conversation as resolved.
Show resolved Hide resolved

index = Index(range(result_blocks[0].values.shape[-1]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we figure out this length before calling grouped_reduce? if so, i think that we can turn it into an argument, then refactor reduce, apply and possibly quantile to dispatch to this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length can certainly be known before hand for the groupby case (self.grouper.ngroups), I am only not fully sure this is a nice API for the Manager


if ignore_failures:
return self._combine(result_blocks, index=index)

return type(self).from_blocks(result_blocks, [self.axes[0], index])

def operate_blockwise(self, other: BlockManager, array_op) -> BlockManager:
"""
Apply array_op blockwise with another (aligned) BlockManager.
Expand Down