Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework cohorts algorithm and heuristics #396

Closed
dcherian opened this issue Sep 17, 2024 · 11 comments · Fixed by #415
Closed

Rework cohorts algorithm and heuristics #396

dcherian opened this issue Sep 17, 2024 · 11 comments · Fixed by #415

Comments

@dcherian
Copy link
Collaborator

dcherian commented Sep 17, 2024

Example ERA5

import dask.array
import flox.xarray
import pandas as pd
import xarray as xr

dims = ("time", "level", "lat", "lon")
# nyears is number of years, adjust to make bigger,
# full dataset is 60-ish years.
nyears = 10
shape = (nyears * 365 * 24, 37, 721, 1440)
chunks = (1, -1, -1, -1)

ds = xr.Dataset(
    {"U": (dims, dask.array.random.random(shape, chunks=chunks))},
    coords={"time": pd.date_range("2001-01-01", periods=shape[0], freq="H")},
)
ds.resample(time="D").mean()

All the overhead is in subset_to_blocks

dcherian added a commit that referenced this issue Sep 18, 2024
@dcherian dcherian reopened this Sep 18, 2024
@dcherian
Copy link
Collaborator Author

dcherian commented Sep 18, 2024

Now the overhead is in creating 1000s of dask arrays: here's the profile of dask.array.Array.__new__ which is totally useless since I'm already passing in valid chunks and meta!

  1331     14606  124899000.0   8551.2     10.5          meta = meta_from_array(meta, dtype=dtype)
  1332                                           
  1333                                                   if (
  1334     14606    2636000.0    180.5      0.2              isinstance(chunks, str)
  1335     14606    2500000.0    171.2      0.2              or isinstance(chunks, tuple)
  1336      3656     510000.0    139.5      0.0              and chunks
  1337      3656    5453000.0   1491.5      0.5              and any(isinstance(c, str) for c in chunks)
  1338                                                   ):
  1339                                                       dt = meta.dtype
  1340                                                   else:
  1341     14606    1583000.0    108.4      0.1              dt = None
  1342     14606  259910000.0  17794.7     21.9          self._chunks = normalize_chunks(chunks, shape, dtype=dt)
  1343     14606    5396000.0    369.4      0.5          if self.chunks is None:
  1344                                                       raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
  1345     14606  587524000.0  40224.8     49.5          self._meta = meta_from_array(meta, ndim=self.ndim, dtype=dtype)

cc @phofl

The current approach in flox is pretty flawed --- it will create 10,000-ish layers hehe but perhaps there are some quick wins.

@dcherian
Copy link
Collaborator Author

Alternatively, we need a better approach.
The current way is to select out a cohort, make that a new dask array, apply the tree reduction, then concatenate across cohorts.

@phofl
Copy link

phofl commented Nov 4, 2024

Just linking my other comment here: dask/dask#11026 (comment)

@dcherian
Copy link
Collaborator Author

dcherian commented Nov 4, 2024

Yes some clever use of shuffle might be the way here.

@phofl
Copy link

phofl commented Nov 12, 2024

Can you help me understand a bit better what high-level API would be useful here?

My understanding of the steps involved is as follows:

  • map blocks to do a groupby().reducer() for every chunk in the beginning (?)
  • Push every group into a new array and then do a tree-reduce on each array
  • concatenation

The pattern of creating a new array per group comes with significant overhead and also makes life for the scheduler harder. A type of highlevel API that would be useful here (based on my understanding):

  • Select each group into a different "branch" of an array
  • run a tree reduction on each branch of said array

Basically each array now would be a branch in the future, kind of like a shuffle but with no guarantee that each group ends up in a single array. Is that understanding roughly correct?

@dcherian
Copy link
Collaborator Author

yes some kind of shuffle will help, but the harder bit is the "tree reduction on each branch". That API basically takes Arrays, and uses Arrays internally.

PS: I'm low on bandwidth at the moment, and can only really engage here in two weeks. Too many balls in the air!

@phofl
Copy link

phofl commented Nov 12, 2024

No worries, let's just tackle this a bit later then.

@dcherian
Copy link
Collaborator Author

While working on my talk, I just remembered that you all had fixed a major scheduling problem that made "cohorts" better than "map-reduce" in many more cases. That isn't true anymore, so we should revisit the algorithm and the heuristics for when it's a good automatic choice.

@dcherian dcherian changed the title cohorts can be slow for really large resampling problems Rework cohorts algorithm and heuristics Nov 12, 2024
@phofl
Copy link

phofl commented Nov 12, 2024

Can you elaborate a bit more? Did we break something in Dask since we fixed the ordering?

Edit: I think I misread your comment. You mean that the improved ordering behavior made map-reduce the better choice not that we broke something since we fixed the problem, correct?

@dcherian
Copy link
Collaborator Author

No your fix makes my heuristics useless :P we need to update them and choose "map-reduce" in more cases than we do currently. The graph is significantly better and easier to schedule with "map-reduce"

@phofl
Copy link

phofl commented Nov 12, 2024

Puuuh, you had me worried there for a bit 😂

dcherian added a commit that referenced this issue Jan 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants