Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followup on codecs #1889

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import Iterable
from collections.abc import Awaitable, Callable, Iterable
from typing import TYPE_CHECKING, Generic, TypeVar

from zarr.abc.metadata import Metadata
from zarr.abc.store import ByteGetter, ByteSetter
from zarr.buffer import Buffer, NDBuffer
from zarr.common import concurrent_map
from zarr.config import config

if TYPE_CHECKING:
from typing_extensions import Self
Expand Down Expand Up @@ -59,7 +61,7 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
"""
return chunk_spec

def evolve(self, array_spec: ArraySpec) -> Self:
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
"""Fills in codec configuration parameters that can be automatically
inferred from the array metadata.

Expand All @@ -83,7 +85,9 @@ def validate(self, array_metadata: ArrayMetadata) -> None:
"""
...

@abstractmethod
async def _decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput:
raise NotImplementedError

async def decode(
self,
chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]],
Expand All @@ -100,9 +104,13 @@ async def decode(
-------
Iterable[CodecInput | None]
"""
...
return await batching_helper(self._decode_single, chunks_and_specs)

async def _encode_single(
self, chunk_data: CodecInput, chunk_spec: ArraySpec
) -> CodecOutput | None:
raise NotImplementedError

@abstractmethod
async def encode(
self,
chunks_and_specs: Iterable[tuple[CodecInput | None, ArraySpec]],
Expand All @@ -119,7 +127,7 @@ async def encode(
-------
Iterable[CodecOutput | None]
"""
...
return await batching_helper(self._encode_single, chunks_and_specs)


class ArrayArrayCodec(_Codec[NDBuffer, NDBuffer]):
Expand All @@ -146,7 +154,11 @@ class BytesBytesCodec(_Codec[Buffer, Buffer]):
class ArrayBytesCodecPartialDecodeMixin:
"""Mixin for array-to-bytes codecs that implement partial decoding."""

@abstractmethod
async def _decode_partial_single(
self, byte_getter: ByteGetter, selection: SliceSelection, chunk_spec: ArraySpec
) -> NDBuffer | None:
raise NotImplementedError

async def decode_partial(
self,
batch_info: Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]],
Expand All @@ -167,13 +179,28 @@ async def decode_partial(
-------
Iterable[NDBuffer | None]
"""
...
return await concurrent_map(
[
(byte_getter, selection, chunk_spec)
for byte_getter, selection, chunk_spec in batch_info
],
self._decode_partial_single,
config.get("async.concurrency"),
)


class ArrayBytesCodecPartialEncodeMixin:
"""Mixin for array-to-bytes codecs that implement partial encoding."""

@abstractmethod
async def _encode_partial_single(
self,
byte_setter: ByteSetter,
chunk_array: NDBuffer,
selection: SliceSelection,
chunk_spec: ArraySpec,
) -> None:
raise NotImplementedError

async def encode_partial(
self,
batch_info: Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]],
Expand All @@ -192,7 +219,14 @@ async def encode_partial(
The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data.
The chunk spec contains information about the chunk.
"""
...
await concurrent_map(
[
(byte_setter, chunk_array, selection, chunk_spec)
for byte_setter, chunk_array, selection, chunk_spec in batch_info
],
self._encode_partial_single,
config.get("async.concurrency"),
)


class CodecPipeline(Metadata):
Expand All @@ -203,7 +237,7 @@ class CodecPipeline(Metadata):
and writes them to a store (via ByteSetter)."""

@abstractmethod
def evolve(self, array_spec: ArraySpec) -> Self:
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
"""Fills in codec configuration parameters that can be automatically
inferred from the array metadata.

Expand Down Expand Up @@ -347,3 +381,25 @@ async def write(
value : NDBuffer
"""
...


async def batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
) -> list[CodecOutput | None]:
return await concurrent_map(
[(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info],
noop_for_none(func),
config.get("async.concurrency"),
)


def noop_for_none(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
) -> Callable[[CodecInput | None, ArraySpec], Awaitable[CodecOutput | None]]:
async def wrap(chunk: CodecInput | None, chunk_spec: ArraySpec) -> CodecOutput | None:
if chunk is None:
return None
return await func(chunk, chunk_spec)

return wrap
30 changes: 15 additions & 15 deletions src/zarr/codecs/_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
import numcodecs
from numcodecs.compat import ensure_bytes, ensure_ndarray

from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec
from zarr.buffer import Buffer, NDBuffer
from zarr.codecs.mixins import ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin
from zarr.common import JSON, ArraySpec, to_thread


@dataclass(frozen=True)
class V2Compressor(ArrayBytesCodecBatchMixin):
class V2Compressor(ArrayBytesCodec):
compressor: dict[str, JSON] | None

is_fixed_size = False

async def decode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -38,7 +38,7 @@ async def decode_single(

return NDBuffer.from_numpy_array(chunk_numpy_array)

async def encode_single(
async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
Expand All @@ -64,44 +64,44 @@ def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec)


@dataclass(frozen=True)
class V2Filters(ArrayArrayCodecBatchMixin):
class V2Filters(ArrayArrayCodec):
filters: list[dict[str, JSON]]

is_fixed_size = False

async def decode_single(
async def _decode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
chunk_numpy_array = chunk_array.as_numpy_array()
chunk_ndarray = chunk_array.as_ndarray_like()
# apply filters in reverse order
if self.filters is not None:
for filter_metadata in self.filters[::-1]:
filter = numcodecs.get_codec(filter_metadata)
chunk_numpy_array = await to_thread(filter.decode, chunk_numpy_array)
chunk_ndarray = await to_thread(filter.decode, chunk_ndarray)

# ensure correct chunk shape
if chunk_numpy_array.shape != chunk_spec.shape:
chunk_numpy_array = chunk_numpy_array.reshape(
if chunk_ndarray.shape != chunk_spec.shape:
chunk_ndarray = chunk_ndarray.reshape(
chunk_spec.shape,
order=chunk_spec.order,
)

return NDBuffer.from_numpy_array(chunk_numpy_array)
return NDBuffer.from_ndarray_like(chunk_ndarray)

async def encode_single(
async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> NDBuffer | None:
chunk_numpy_array = chunk_array.as_numpy_array().ravel(order=chunk_spec.order)
chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order)

for filter_metadata in self.filters:
filter = numcodecs.get_codec(filter_metadata)
chunk_numpy_array = await to_thread(filter.encode, chunk_numpy_array)
chunk_ndarray = await to_thread(filter.encode, chunk_ndarray)

return NDBuffer.from_numpy_array(chunk_numpy_array)
return NDBuffer.from_ndarray_like(chunk_ndarray)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
10 changes: 5 additions & 5 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import numcodecs
from numcodecs.blosc import Blosc

from zarr.abc.codec import BytesBytesCodec
from zarr.buffer import Buffer, as_numpy_array_wrapper
from zarr.codecs.mixins import BytesBytesCodecBatchMixin
from zarr.codecs.registry import register_codec
from zarr.common import parse_enum, parse_named_configuration, to_thread

Expand Down Expand Up @@ -74,7 +74,7 @@ def parse_blocksize(data: JSON) -> int:


@dataclass(frozen=True)
class BloscCodec(BytesBytesCodecBatchMixin):
class BloscCodec(BytesBytesCodec):
is_fixed_size = False

typesize: int
Expand Down Expand Up @@ -125,7 +125,7 @@ def to_dict(self) -> dict[str, JSON]:
},
}

def evolve(self, array_spec: ArraySpec) -> Self:
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
new_codec = self
if new_codec.typesize is None:
new_codec = replace(new_codec, typesize=array_spec.dtype.itemsize)
Expand Down Expand Up @@ -158,14 +158,14 @@ def _blosc_codec(self) -> Blosc:
}
return Blosc.from_config(config_dict)

async def decode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
_chunk_spec: ArraySpec,
) -> Buffer:
return await to_thread(as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes)

async def encode_single(
async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand Down
10 changes: 5 additions & 5 deletions src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import numpy as np

from zarr.abc.codec import ArrayBytesCodec
from zarr.buffer import Buffer, NDBuffer
from zarr.codecs.mixins import ArrayBytesCodecBatchMixin
from zarr.codecs.registry import register_codec
from zarr.common import parse_enum, parse_named_configuration

Expand All @@ -27,7 +27,7 @@ class Endian(Enum):


@dataclass(frozen=True)
class BytesCodec(ArrayBytesCodecBatchMixin):
class BytesCodec(ArrayBytesCodec):
is_fixed_size = True

endian: Endian | None
Expand All @@ -51,7 +51,7 @@ def to_dict(self) -> dict[str, JSON]:
else:
return {"name": "bytes", "configuration": {"endian": self.endian}}

def evolve(self, array_spec: ArraySpec) -> Self:
def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
if array_spec.dtype.itemsize == 0:
if self.endian is not None:
return replace(self, endian=None)
Expand All @@ -61,7 +61,7 @@ def evolve(self, array_spec: ArraySpec) -> Self:
)
return self

async def decode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -84,7 +84,7 @@ async def decode_single(
)
return chunk_array

async def encode_single(
async def _encode_single(
self,
chunk_array: NDBuffer,
_chunk_spec: ArraySpec,
Expand Down
8 changes: 4 additions & 4 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import numpy as np
from crc32c import crc32c

from zarr.abc.codec import BytesBytesCodec
from zarr.buffer import Buffer
from zarr.codecs.mixins import BytesBytesCodecBatchMixin
from zarr.codecs.registry import register_codec
from zarr.common import parse_named_configuration

Expand All @@ -18,7 +18,7 @@


@dataclass(frozen=True)
class Crc32cCodec(BytesBytesCodecBatchMixin):
class Crc32cCodec(BytesBytesCodec):
is_fixed_size = True

@classmethod
Expand All @@ -29,7 +29,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "crc32c"}

async def decode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
_chunk_spec: ArraySpec,
Expand All @@ -47,7 +47,7 @@ async def decode_single(
)
return Buffer.from_array_like(inner_bytes)

async def encode_single(
async def _encode_single(
self,
chunk_bytes: Buffer,
_chunk_spec: ArraySpec,
Expand Down
Loading
Loading