-
-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Getitems: support meta_array
#1131
Changes from 23 commits
c268c63
c7023f9
89fa599
f05ee3a
0eed377
e578051
03c97a8
8ba463c
dfa731b
8e753d6
f05edb1
1d2b6ea
f87cb60
eea7466
05be1d4
af54f7e
5513d6f
ad46ccc
40ecba0
0f224d4
61d7a03
81549f5
2bfe68a
4e9c39e
a250f64
02fc80d
61981f2
d0afcde
7e01831
d9838ef
c3ee95f
ec5f396
a1d3520
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,10 +2,11 @@ | |
import os | ||
from collections.abc import MutableMapping | ||
from string import ascii_letters, digits | ||
from typing import Any, List, Mapping, Optional, Union | ||
from typing import Any, Sequence, List, Mapping, Optional, Union | ||
|
||
from zarr.meta import Metadata2, Metadata3 | ||
from zarr.util import normalize_storage_path | ||
from zarr.context import Context | ||
|
||
# v2 store keys | ||
array_meta_key = '.zarray' | ||
|
@@ -129,6 +130,34 @@ def _ensure_store(store: Any): | |
f"wrap it in Zarr.storage.KVStore. Got {store}" | ||
) | ||
|
||
def getitems( | ||
self, keys: Sequence[str], contexts: Mapping[str, Context] = {} | ||
) -> Mapping[str, Any]: | ||
"""Retrieve data from multiple keys. | ||
|
||
Parameters | ||
---------- | ||
keys : Iterable[str] | ||
The keys to retrieve | ||
contexts: Mapping[str, Context] | ||
A mapping of keys to their context. Each context is a mapping of store | ||
specific information. E.g. a context could be a dict telling the store | ||
the preferred output array type: `{"meta_array": cupy.empty(())}` | ||
|
||
Returns | ||
------- | ||
Mapping | ||
A collection mapping the input keys to their results. | ||
|
||
Notes | ||
----- | ||
This default implementation uses __getitem__() to read each key sequentially and | ||
ignores contexts. Overwrite this method to implement concurrent reads of multiple | ||
keys and/or to utilize the contexts. | ||
""" | ||
|
||
return {k: self[k] for k in keys if k in self} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Know fsspec-based storage layers are using a cc @martindurant (who also may have thoughts) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The point of it being blocking, is that many keys may be being fetched concurrently. If you make it an iterator, you lose that, unless you have something that can wait on an async iterator, which in turn has first-completed working. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with both of you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understood, and will be interested in how that looks. By the way, I mentioned elsewhere the possibility of passing key-specific metadata to the get function; that would happen in this same signature. I wonder if you have any use for an array where only some of it is destined for the GPU (perhaps because that data already exists there and doesn't need loading at all). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have an example of what they would look like, @martindurant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As noted above, think this discussion gets hairy enough it should be broken out into an issue (likely two) and discussed separately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If we want to go the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this also imply that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point, yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait a second, there is no |
||
|
||
|
||
class Store(BaseStore): | ||
"""Abstract store class used by implementations following the Zarr v2 spec. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
|
||
from typing import TypedDict | ||
|
||
from numcodecs.compat import NDArrayLike | ||
|
||
|
||
class Context(TypedDict, total=False): | ||
""" A context for component specific information | ||
|
||
All keys are optional. Any component reading the context must provide | ||
a default implementation in the case a key cannot be found. | ||
|
||
Items | ||
----- | ||
meta_array : array-like, optional | ||
An array-like instance to use for determining the preferred output | ||
array type. | ||
""" | ||
meta_array: NDArrayLike |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1257,20 +1257,13 @@ def _get_selection(self, indexer, out=None, fields=None): | |
else: | ||
check_array_shape('out', out, out_shape) | ||
|
||
# iterate over chunks | ||
if not hasattr(self.chunk_store, "getitems") or \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❤️ for removing |
||
any(map(lambda x: x == 0, self.shape)): | ||
# sequentially get one key at a time from storage | ||
for chunk_coords, chunk_selection, out_selection in indexer: | ||
|
||
# load chunk selection into output array | ||
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection, | ||
drop_axes=indexer.drop_axes, fields=fields) | ||
else: | ||
# allow storage to get multiple items at once | ||
if math.prod(out_shape) > 0: | ||
# get chunks | ||
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer) | ||
self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection, | ||
drop_axes=indexer.drop_axes, fields=fields) | ||
self._chunk_getitems( | ||
lchunk_coords, lchunk_selection, out, lout_selection, | ||
drop_axes=indexer.drop_axes, fields=fields | ||
) | ||
|
||
if out.shape: | ||
return out | ||
|
@@ -1930,76 +1923,44 @@ def _process_chunk( | |
# store selected data in output | ||
out[out_selection] = tmp | ||
|
||
def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this so private that no one could have made use of it in a subclass? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, AFAICT. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah that's my understanding as well. Or at least by prefixing with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jakirkham: this pre-dates me, so I defer. But if it's not documented somewhere we might want to review if that holds across the board. For me, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a Python thing generally (not specific to Zarr). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Single underscore if by far the most common thing to do, to show intent rather than enforce any privateness (which double underscore doesn't do either). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
drop_axes=None, fields=None): | ||
"""Obtain part or whole of a chunk. | ||
def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, | ||
drop_axes=None, fields=None): | ||
"""Obtain part or whole of chunks. | ||
|
||
Parameters | ||
---------- | ||
chunk_coords : tuple of ints | ||
Indices of the chunk. | ||
chunk_selection : selection | ||
Location of region within the chunk to extract. | ||
chunk_coords : list of tuple of ints | ||
Indices of the chunks. | ||
chunk_selection : list of selections | ||
Location of region within the chunks to extract. | ||
out : ndarray | ||
Array to store result in. | ||
out_selection : selection | ||
Location of region within output array to store results in. | ||
out_selection : list of selections | ||
Location of regions within output array to store results in. | ||
drop_axes : tuple of ints | ||
Axes to squeeze out of the chunk. | ||
fields | ||
TODO | ||
|
||
""" | ||
out_is_ndarray = True | ||
try: | ||
out = ensure_ndarray_like(out) | ||
except TypeError: | ||
out_is_ndarray = False | ||
|
||
assert len(chunk_coords) == len(self._cdata_shape) | ||
|
||
# obtain key for chunk | ||
ckey = self._chunk_key(chunk_coords) | ||
|
||
try: | ||
# obtain compressed data for chunk | ||
cdata = self.chunk_store[ckey] | ||
|
||
except KeyError: | ||
# chunk not initialized | ||
if self._fill_value is not None: | ||
if fields: | ||
fill_value = self._fill_value[fields] | ||
else: | ||
fill_value = self._fill_value | ||
out[out_selection] = fill_value | ||
|
||
else: | ||
self._process_chunk(out, cdata, chunk_selection, drop_axes, | ||
out_is_ndarray, fields, out_selection) | ||
|
||
def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, | ||
drop_axes=None, fields=None): | ||
"""As _chunk_getitem, but for lists of chunks | ||
|
||
This gets called where the storage supports ``getitems``, so that | ||
it can decide how to fetch the keys, allowing concurrency. | ||
""" | ||
out_is_ndarray = True | ||
try: | ||
out = ensure_ndarray_like(out) | ||
except TypeError: # pragma: no cover | ||
out_is_ndarray = False | ||
|
||
# Keys to retrieve | ||
ckeys = [self._chunk_key(ch) for ch in lchunk_coords] | ||
|
||
partial_read_decode = False | ||
# Check if we can do a partial read | ||
if ( | ||
self._partial_decompress | ||
and self._compressor | ||
and self._compressor.codec_id == "blosc" | ||
and hasattr(self._compressor, "decode_partial") | ||
and not fields | ||
and self.dtype != object | ||
and hasattr(self.chunk_store, "getitems") | ||
): | ||
partial_read_decode = True | ||
cdatas = { | ||
|
@@ -2008,8 +1969,11 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, | |
if ckey in self.chunk_store | ||
} | ||
else: | ||
partial_read_decode = False | ||
cdatas = self.chunk_store.getitems(ckeys, on_error="omit") | ||
contexts = {} | ||
if not isinstance(self._meta_array, np.ndarray): | ||
contexts = {k: {"meta_array": self._meta_array} for k in ckeys} | ||
cdatas = self.chunk_store.getitems(ckeys, contexts=contexts) | ||
|
||
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): | ||
if ckey in cdatas: | ||
self._process_chunk( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why the context needs to be key specific. If a user wants to load data into the GPU, surely they want to do that for the entire read, not just some chunks. It seems like we could simplify this a lot by just providing one single context to
getitems
, rather than a mapping.There is also likely a performance cost to doing it this way, no? If you're reading 1000s of chunks at a time, it seems silly to be carrying around all these big mappings to the same context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, this is to support the generic context approach. Although, it is possible to limit the performance impact by using a custom
Mapping
that returns the same value for all keys.Also notice, the current code implements the context approach where the
meta_array
argument is embedded within the context argument. We could rollback to my original approach wheregetitems()
takes themeta_array
as an argument directly:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not try to cover generic context behavior for hypothetical use cases right now. We should strive to make the API and implementation as simple as possible to achieve the specific goal.
At the same time, users of this API should recognize that it is experimental and may change in the future. I fully support developing the context concept more generically...I just don't think this PR should be blocked on that.