Skip to content

Commit

Permalink
Merge branch 'main' into topk
Browse files Browse the repository at this point in the history
* main:
  Fix first, last again (#381)
  Fix docs build (#382)
  Optimize for-loop merging of cohorts. (#378)
  Add cohorts snapshot tests with syrupy (#379)
  Stricter tolerance in property tests (#377)
  Better gridlines
  Update containment image with gridlines, higher dpi
  • Loading branch information
dcherian committed Aug 7, 2024
2 parents 80c67f4 + f0ce343 commit 7056d18
Show file tree
Hide file tree
Showing 21 changed files with 14,588 additions and 86 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci-additional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ jobs:
- name: Run mypy
run: |
python -m mypy --install-types --non-interactive --cobertura-xml-report mypy_report
mkdir .mypy_cache
python -m mypy --install-types --non-interactive --cache-dir=.mypy_cache/ --cobertura-xml-report mypy_report
- name: Upload mypy coverage to Codecov
uses: codecov/codecov-action@v4.5.0
Expand Down
54 changes: 10 additions & 44 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ concurrency:

jobs:
test:
name: Test (${{ matrix.python-version }}, ${{ matrix.os }})
name: Test (${{matrix.env}}, ${{ matrix.python-version }}, ${{ matrix.os }})
runs-on: ${{ matrix.os }}
defaults:
run:
Expand All @@ -25,10 +25,18 @@ jobs:
fail-fast: false
matrix:
os: ["ubuntu-latest"]
env: ["environment"]
python-version: ["3.9", "3.12"]
include:
- os: "windows-latest"
env: "environment"
python-version: "3.12"
- os: "ubuntu-latest"
env: "no-dask" # "no-xarray", "no-numba"
python-version: "3.12"
- os: "ubuntu-latest"
env: "minimal-requirements"
python-version: "3.9"
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -39,7 +47,7 @@ jobs:
- name: Set up conda environment
uses: mamba-org/setup-micromamba@v1
with:
environment-file: ci/environment.yml
environment-file: ci/${{ matrix.env }}.yml
environment-name: flox-tests
init-shell: bash
cache-environment: true
Expand Down Expand Up @@ -81,48 +89,6 @@ jobs:
path: .hypothesis/
key: cache-hypothesis-${{ runner.os }}-${{ matrix.python-version }}-${{ github.run_id }}

optional-deps:
name: ${{ matrix.env }}
runs-on: "ubuntu-latest"
defaults:
run:
shell: bash -l {0}
strategy:
fail-fast: false
matrix:
python-version: ["3.12"]
env: ["no-dask"] # "no-xarray", "no-numba"
include:
- env: "minimal-requirements"
python-version: "3.9"
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0 # Fetch all history for all branches and tags.
- name: Set up conda environment
uses: mamba-org/setup-micromamba@v1
with:
environment-file: ci/${{ matrix.env }}.yml
environment-name: flox-tests
init-shell: bash
cache-environment: true
create-args: |
python=${{ matrix.python-version }}
- name: Install flox
run: |
python -m pip install --no-deps -e .
- name: Run tests
run: |
python -m pytest -n auto --cov=./ --cov-report=xml
- name: Upload code coverage to Codecov
uses: codecov/codecov-action@v4.5.0
with:
file: ./coverage.xml
flags: unittests
env_vars: RUNNER_OS
name: codecov-umbrella
fail_ci_if_error: false

xarray-groupby:
name: xarray-groupby
runs-on: ubuntu-latest
Expand Down
Empty file added asv_bench/__init__.py
Empty file.
6 changes: 5 additions & 1 deletion asv_bench/benchmarks/cohorts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def time_find_group_cohorts(self):
except AttributeError:
pass

def track_num_cohorts(self):
return len(self.chunks_cohorts())

def time_graph_construct(self):
flox.groupby_reduce(self.array, self.by, func="sum", axis=self.axis)

Expand All @@ -60,10 +63,11 @@ def track_num_tasks_optimized(self):
def track_num_layers(self):
return len(self.result.dask.layers)

track_num_cohorts.unit = "cohorts" # type: ignore[attr-defined] # Lazy
track_num_tasks.unit = "tasks" # type: ignore[attr-defined] # Lazy
track_num_tasks_optimized.unit = "tasks" # type: ignore[attr-defined] # Lazy
track_num_layers.unit = "layers" # type: ignore[attr-defined] # Lazy
for f in [track_num_tasks, track_num_tasks_optimized, track_num_layers]:
for f in [track_num_tasks, track_num_tasks_optimized, track_num_layers, track_num_cohorts]:
f.repeat = 1 # type: ignore[attr-defined] # Lazy
f.rounds = 1 # type: ignore[attr-defined] # Lazy
f.number = 1 # type: ignore[attr-defined] # Lazy
Expand Down
2 changes: 1 addition & 1 deletion ci/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies:
- myst-parser
- myst-nb
- sphinx
- furo
- furo>=2024.08
- ipykernel
- jupyter
- sphinx-codeautolink
Expand Down
1 change: 1 addition & 0 deletions ci/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies:
- pytest-cov
- pytest-pretty
- pytest-xdist
- syrupy
- xarray
- pre-commit
- numpy_groupies>=0.9.19
Expand Down
1 change: 1 addition & 0 deletions ci/minimal-requirements.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies:
- pytest-cov
- pytest-pretty
- pytest-xdist
- syrupy
- numpy==1.22
- scipy==1.9.0
- numpy_groupies==0.9.19
Expand Down
1 change: 1 addition & 0 deletions ci/no-dask.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies:
- pytest-cov
- pytest-pretty
- pytest-xdist
- syrupy
- xarray
- numpydoc
- pre-commit
Expand Down
1 change: 1 addition & 0 deletions ci/no-numba.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies:
- pytest-cov
- pytest-pretty
- pytest-xdist
- syrupy
- xarray
- pre-commit
- numpy_groupies>=0.9.19
Expand Down
2 changes: 2 additions & 0 deletions ci/no-xarray.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- conda-forge
dependencies:
- codecov
- syrupy
- pandas
- numpy>=1.22
- scipy
Expand All @@ -11,6 +12,7 @@ dependencies:
- pytest-cov
- pytest-pretty
- pytest-xdist
- syrupy
- dask-core
- numpydoc
- pre-commit
Expand Down
1 change: 1 addition & 0 deletions ci/upstream-dev-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
# - scipy
- pytest-pretty
- pytest-xdist
- syrupy
- pip
# for cftime
- cython>=0.29.20
Expand Down
Binary file modified docs/diagrams/containment.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion flox/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# flake8: noqa
"""Top-level module for flox ."""
from . import cache
from .aggregations import Aggregation # noqa
from .aggregations import Aggregation, Scan # noqa
from .core import groupby_reduce, groupby_scan, rechunk_for_blockwise, rechunk_for_cohorts # noqa


Expand Down
92 changes: 68 additions & 24 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ def _is_minmax_reduction(func: T_Agg) -> bool:


def _is_first_last_reduction(func: T_Agg) -> bool:
return isinstance(func, str) and func in ["nanfirst", "nanlast", "first", "last"]
if isinstance(func, Aggregation):
func = func.name
return func in ["nanfirst", "nanlast", "first", "last"]


def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex:
Expand Down Expand Up @@ -395,14 +397,16 @@ def find_group_cohorts(
chunks_per_label = chunks_per_label[present_labels_mask]

label_chunks = {
present_labels[idx]: bitmask.indices[slice(bitmask.indptr[idx], bitmask.indptr[idx + 1])]
present_labels[idx].item(): bitmask.indices[
slice(bitmask.indptr[idx], bitmask.indptr[idx + 1])
]
for idx in range(bitmask.shape[LABEL_AXIS])
}

# Invert the label_chunks mapping so we know which labels occur together.
def invert(x) -> tuple[np.ndarray, ...]:
arr = label_chunks[x]
return tuple(arr)
return tuple(arr.tolist())

chunks_cohorts = tlz.groupby(invert, label_chunks.keys())

Expand Down Expand Up @@ -476,36 +480,52 @@ def invert(x) -> tuple[np.ndarray, ...]:
containment.nnz / math.prod(containment.shape)
)
)
# Use a threshold to force some merging. We do not use the filtered
# containment matrix for estimating "sparsity" because it is a bit
# hard to reason about.

# Next we for-loop over groups and merge those that are quite similar.
# Use a threshold on containment to always force some merging.
# Note that we do not use the filtered containment matrix for estimating "sparsity"
# because it is a bit hard to reason about.
MIN_CONTAINMENT = 0.75 # arbitrary
mask = containment.data < MIN_CONTAINMENT

# Now we also know "exact cohorts" -- cohorts whose constituent groups
# occur in exactly the same chunks. We only need examine one member of each group.
# Skip the others by first looping over the exact cohorts, and zero out those rows.
repeated = np.concatenate([v[1:] for v in chunks_cohorts.values()]).astype(int)
repeated_idx = np.searchsorted(present_labels, repeated)
for i in repeated_idx:
mask[containment.indptr[i] : containment.indptr[i + 1]] = True
containment.data[mask] = 0
containment.eliminate_zeros()

# Iterate over labels, beginning with those with most chunks
# Figure out all the labels we need to loop over later
n_overlapping_labels = containment.astype(bool).sum(axis=1)
order = np.argsort(n_overlapping_labels, kind="stable")[::-1]
# Order is such that we iterate over labels, beginning with those with most overlaps
# Also filter out any "exact" cohorts
order = order[n_overlapping_labels[order] > 0]

logger.debug("find_group_cohorts: merging cohorts")
order = np.argsort(containment.sum(axis=LABEL_AXIS))[::-1]
merged_cohorts = {}
merged_keys = set()
# TODO: we can optimize this to loop over chunk_cohorts instead
# by zeroing out rows that are already in a cohort
for rowidx in order:
if present_labels[rowidx] in merged_keys:
continue
cohidx = containment.indices[
slice(containment.indptr[rowidx], containment.indptr[rowidx + 1])
]
cohort_ = present_labels[cohidx]
cohort = [elem for elem in cohort_ if elem not in merged_keys]
cohort = [elem.item() for elem in cohort_ if elem not in merged_keys]
if not cohort:
continue
merged_keys.update(cohort)
allchunks = (label_chunks[member] for member in cohort)
allchunks = (label_chunks[member].tolist() for member in cohort)
chunk = tuple(set(itertools.chain(*allchunks)))
merged_cohorts[chunk] = cohort

actual_ngroups = np.concatenate(tuple(merged_cohorts.values())).size
expected_ngroups = present_labels.size
assert len(merged_keys) == actual_ngroups
assert expected_ngroups == actual_ngroups, (expected_ngroups, actual_ngroups)

# sort by first label in cohort
Expand Down Expand Up @@ -1629,7 +1649,12 @@ def dask_groupby_agg(
# This allows us to discover groups at compute time, support argreductions, lower intermediate
# memory usage (but method="cohorts" would also work to reduce memory in some cases)
labels_are_unknown = is_duck_dask_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
do_grouped_combine = (
_is_arg_reduction(agg)
or labels_are_unknown
or (_is_first_last_reduction(agg) and array.dtype.kind != "f")
)
do_simple_combine = not do_grouped_combine

if method == "blockwise":
# use the "non dask" code path, but applied blockwise
Expand Down Expand Up @@ -1686,7 +1711,7 @@ def dask_groupby_agg(

tree_reduce = partial(
dask.array.reductions._tree_reduce,
name=f"{name}-reduce",
name=f"{name}-simple-reduce",
dtype=array.dtype,
axis=axis,
keepdims=True,
Expand Down Expand Up @@ -1721,14 +1746,20 @@ def dask_groupby_agg(
groups_ = []
for blks, cohort in chunks_cohorts.items():
cohort_index = pd.Index(cohort)
reindexer = partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
reindexer = (
partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
if do_simple_combine
else identity
)
reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer)
# now that we have reindexed, we can set reindex=True explicitlly
reduced_.append(
tree_reduce(
reindexed,
combine=partial(combine, agg=agg, reindex=True),
aggregate=partial(aggregate, expected_groups=cohort_index, reindex=True),
combine=partial(combine, agg=agg, reindex=do_simple_combine),
aggregate=partial(
aggregate, expected_groups=cohort_index, reindex=do_simple_combine
),
)
)
# This is done because pandas promotes to 64-bit types when an Index is created
Expand Down Expand Up @@ -1974,8 +2005,13 @@ def _validate_reindex(
expected_groups,
any_by_dask: bool,
is_dask_array: bool,
array_dtype: Any,
) -> bool | None:
# logger.debug("Entering _validate_reindex: reindex is {}".format(reindex)) # noqa
def first_or_last():
return func in ["first", "last"] or (
_is_first_last_reduction(func) and array_dtype.kind != "f"
)

all_numpy = not is_dask_array and not any_by_dask
if reindex is True and not all_numpy:
Expand All @@ -1985,7 +2021,7 @@ def _validate_reindex(
raise ValueError(
"reindex=True is not a valid choice for method='blockwise' or method='cohorts'."
)
if func in ["first", "last"]:
if first_or_last():
raise ValueError("reindex must be None or False when func is 'first' or 'last.")

if reindex is None:
Expand All @@ -1996,9 +2032,10 @@ def _validate_reindex(
if all_numpy:
return True

if func in ["first", "last"]:
if first_or_last():
# have to do the grouped_combine since there's no good fill_value
reindex = False
# Also needed for nanfirst, nanlast with no-NaN dtypes
return False

if method == "blockwise":
# for grouping by dask arrays, we set reindex=True
Expand Down Expand Up @@ -2403,12 +2440,19 @@ def groupby_reduce(
if method == "cohorts" and any_by_dask:
raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.")

if not is_duck_array(array):
array = np.asarray(array)

reindex = _validate_reindex(
reindex, func, method, expected_groups, any_by_dask, is_duck_dask_array(array)
reindex,
func,
method,
expected_groups,
any_by_dask,
is_duck_dask_array(array),
array.dtype,
)

if not is_duck_array(array):
array = np.asarray(array)
is_bool_array = np.issubdtype(array.dtype, bool)
array = array.astype(np.intp) if is_bool_array else array

Expand Down Expand Up @@ -2592,7 +2636,7 @@ def groupby_reduce(

# TODO: clean this up
reindex = _validate_reindex(
reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array)
reindex, func, method, expected_, any_by_dask, is_duck_dask_array(array), array.dtype
)

if TYPE_CHECKING:
Expand Down
Loading

0 comments on commit 7056d18

Please sign in to comment.