From 8b1275b5a62d75e585bc5714434f3a5a8a12efb1 Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Fri, 17 May 2024 09:34:42 +0200 Subject: [PATCH 1/3] rm zarr.codecs.mixins --- src/zarr/abc/codec.py | 74 +++++++++++++++++--- src/zarr/codecs/_v2.py | 14 ++-- src/zarr/codecs/blosc.py | 8 +-- src/zarr/codecs/bytes.py | 8 +-- src/zarr/codecs/crc32c_.py | 8 +-- src/zarr/codecs/gzip.py | 8 +-- src/zarr/codecs/mixins.py | 131 ----------------------------------- src/zarr/codecs/sharding.py | 27 ++++---- src/zarr/codecs/transpose.py | 8 +-- src/zarr/codecs/zstd.py | 8 +-- 10 files changed, 110 insertions(+), 184 deletions(-) delete mode 100644 src/zarr/codecs/mixins.py diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 1c665590bf..4bd046eaee 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -1,11 +1,13 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING, Generic, Iterable, TypeVar +from typing import TYPE_CHECKING, Awaitable, Callable, Generic, Iterable, TypeVar +from zarr.config import config from zarr.abc.metadata import Metadata from zarr.abc.store import ByteGetter, ByteSetter from zarr.buffer import Buffer, NDBuffer +from zarr.common import concurrent_map if TYPE_CHECKING: @@ -82,7 +84,9 @@ def validate(self, array_metadata: ArrayMetadata) -> None: """ ... - @abstractmethod + async def _decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput: + raise NotImplementedError + async def decode( self, chunks_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]], @@ -99,9 +103,13 @@ async def decode( ------- Iterable[CodecInput | None] """ - ... + return await batching_helper(self._decode_single, chunks_and_specs) + + async def _encode_single( + self, chunk_data: CodecInput, chunk_spec: ArraySpec + ) -> CodecOutput | None: + raise NotImplementedError - @abstractmethod async def encode( self, chunks_and_specs: Iterable[tuple[CodecInput | None, ArraySpec]], @@ -118,7 +126,7 @@ async def encode( ------- Iterable[CodecOutput | None] """ - ... + return await batching_helper(self._encode_single, chunks_and_specs) class ArrayArrayCodec(_Codec[NDBuffer, NDBuffer]): @@ -145,7 +153,11 @@ class BytesBytesCodec(_Codec[Buffer, Buffer]): class ArrayBytesCodecPartialDecodeMixin: """Mixin for array-to-bytes codecs that implement partial decoding.""" - @abstractmethod + async def _decode_partial_single( + self, byte_getter: ByteGetter, selection: SliceSelection, chunk_spec: ArraySpec + ) -> NDBuffer | None: + raise NotImplementedError + async def decode_partial( self, batch_info: Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]], @@ -166,13 +178,28 @@ async def decode_partial( ------- Iterable[NDBuffer | None] """ - ... + return await concurrent_map( + [ + (byte_getter, selection, chunk_spec) + for byte_getter, selection, chunk_spec in batch_info + ], + self._decode_partial_single, + config.get("async.concurrency"), + ) class ArrayBytesCodecPartialEncodeMixin: """Mixin for array-to-bytes codecs that implement partial encoding.""" - @abstractmethod + async def _encode_partial_single( + self, + byte_setter: ByteSetter, + chunk_array: NDBuffer, + selection: SliceSelection, + chunk_spec: ArraySpec, + ) -> None: + raise NotImplementedError + async def encode_partial( self, batch_info: Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]], @@ -191,7 +218,14 @@ async def encode_partial( The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data. The chunk spec contains information about the chunk. """ - ... + await concurrent_map( + [ + (byte_setter, chunk_array, selection, chunk_spec) + for byte_setter, chunk_array, selection, chunk_spec in batch_info + ], + self._encode_partial_single, + config.get("async.concurrency"), + ) class CodecPipeline(Metadata): @@ -346,3 +380,25 @@ async def write( value : NDBuffer """ ... + + +async def batching_helper( + func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], + batch_info: Iterable[tuple[CodecInput | None, ArraySpec]], +) -> list[CodecOutput | None]: + return await concurrent_map( + [(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info], + noop_for_none(func), + config.get("async.concurrency"), + ) + + +def noop_for_none( + func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], +) -> Callable[[CodecInput | None, ArraySpec], Awaitable[CodecOutput | None]]: + async def wrap(chunk: CodecInput | None, chunk_spec: ArraySpec) -> CodecOutput | None: + if chunk is None: + return None + return await func(chunk, chunk_spec) + + return wrap diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index fb7122600f..94c7f1fa7c 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -3,7 +3,7 @@ from dataclasses import dataclass from zarr.buffer import Buffer, NDBuffer -from zarr.codecs.mixins import ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin +from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec from zarr.common import JSON, ArraySpec, to_thread import numcodecs @@ -11,12 +11,12 @@ @dataclass(frozen=True) -class V2Compressor(ArrayBytesCodecBatchMixin): +class V2Compressor(ArrayBytesCodec): compressor: dict[str, JSON] | None is_fixed_size = False - async def decode_single( + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -38,7 +38,7 @@ async def decode_single( return NDBuffer.from_numpy_array(chunk_numpy_array) - async def encode_single( + async def _encode_single( self, chunk_array: NDBuffer, _chunk_spec: ArraySpec, @@ -64,12 +64,12 @@ def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) @dataclass(frozen=True) -class V2Filters(ArrayArrayCodecBatchMixin): +class V2Filters(ArrayArrayCodec): filters: list[dict[str, JSON]] is_fixed_size = False - async def decode_single( + async def _decode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -90,7 +90,7 @@ async def decode_single( return NDBuffer.from_numpy_array(chunk_numpy_array) - async def encode_single( + async def _encode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index ab3ffab479..443e5969db 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -8,7 +8,7 @@ import numcodecs from numcodecs.blosc import Blosc -from zarr.codecs.mixins import BytesBytesCodecBatchMixin +from zarr.abc.codec import BytesBytesCodec from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_enum, parse_named_configuration, to_thread @@ -74,7 +74,7 @@ def parse_blocksize(data: JSON) -> int: @dataclass(frozen=True) -class BloscCodec(BytesBytesCodecBatchMixin): +class BloscCodec(BytesBytesCodec): is_fixed_size = False typesize: int @@ -158,14 +158,14 @@ def _blosc_codec(self) -> Blosc: } return Blosc.from_config(config_dict) - async def decode_single( + async def _decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, ) -> Buffer: return await to_thread(as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes) - async def encode_single( + async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 6df78a08b8..00a7f78121 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -7,7 +7,7 @@ import numpy as np -from zarr.codecs.mixins import ArrayBytesCodecBatchMixin +from zarr.abc.codec import ArrayBytesCodec from zarr.buffer import Buffer, NDBuffer from zarr.codecs.registry import register_codec from zarr.common import parse_enum, parse_named_configuration @@ -26,7 +26,7 @@ class Endian(Enum): @dataclass(frozen=True) -class BytesCodec(ArrayBytesCodecBatchMixin): +class BytesCodec(ArrayBytesCodec): is_fixed_size = True endian: Optional[Endian] @@ -60,7 +60,7 @@ def evolve(self, array_spec: ArraySpec) -> Self: ) return self - async def decode_single( + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -83,7 +83,7 @@ async def decode_single( ) return chunk_array - async def encode_single( + async def _encode_single( self, chunk_array: NDBuffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index ab4bad65fe..5118b87bae 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -7,7 +7,7 @@ from crc32c import crc32c -from zarr.codecs.mixins import BytesBytesCodecBatchMixin +from zarr.abc.codec import BytesBytesCodec from zarr.buffer import Buffer from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration @@ -19,7 +19,7 @@ @dataclass(frozen=True) -class Crc32cCodec(BytesBytesCodecBatchMixin): +class Crc32cCodec(BytesBytesCodec): is_fixed_size = True @classmethod @@ -30,7 +30,7 @@ def from_dict(cls, data: Dict[str, JSON]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "crc32c"} - async def decode_single( + async def _decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, @@ -48,7 +48,7 @@ async def decode_single( ) return Buffer.from_array_like(inner_bytes) - async def encode_single( + async def _encode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/gzip.py b/src/zarr/codecs/gzip.py index 6a8e30db13..fd1fe77162 100644 --- a/src/zarr/codecs/gzip.py +++ b/src/zarr/codecs/gzip.py @@ -4,7 +4,7 @@ from typing import TYPE_CHECKING from numcodecs.gzip import GZip -from zarr.codecs.mixins import BytesBytesCodecBatchMixin +from zarr.abc.codec import BytesBytesCodec from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration, to_thread @@ -26,7 +26,7 @@ def parse_gzip_level(data: JSON) -> int: @dataclass(frozen=True) -class GzipCodec(BytesBytesCodecBatchMixin): +class GzipCodec(BytesBytesCodec): is_fixed_size = False level: int = 5 @@ -44,14 +44,14 @@ def from_dict(cls, data: Dict[str, JSON]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "gzip", "configuration": {"level": self.level}} - async def decode_single( + async def _decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, ) -> Buffer: return await to_thread(as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes) - async def encode_single( + async def _encode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/mixins.py b/src/zarr/codecs/mixins.py deleted file mode 100644 index 8b0a684509..0000000000 --- a/src/zarr/codecs/mixins.py +++ /dev/null @@ -1,131 +0,0 @@ -from __future__ import annotations - -from abc import abstractmethod -from typing import Awaitable, Callable, Generic, Iterable, TypeVar - - -from zarr.abc.codec import ( - ArrayArrayCodec, - ArrayBytesCodec, - ArrayBytesCodecPartialDecodeMixin, - ArrayBytesCodecPartialEncodeMixin, - ByteGetter, - ByteSetter, - BytesBytesCodec, -) -from zarr.buffer import Buffer, NDBuffer -from zarr.common import ArraySpec, SliceSelection, concurrent_map -from zarr.config import config - - -CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) -CodecOutput = TypeVar("CodecOutput", bound=NDBuffer | Buffer) - - -async def batching_helper( - func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], - batch_info: Iterable[tuple[CodecInput | None, ArraySpec]], -) -> list[CodecOutput | None]: - return await concurrent_map( - [(chunk_array, chunk_spec) for chunk_array, chunk_spec in batch_info], - noop_for_none(func), - config.get("async.concurrency"), - ) - - -def noop_for_none( - func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], -) -> Callable[[CodecInput | None, ArraySpec], Awaitable[CodecOutput | None]]: - async def wrap(chunk: CodecInput | None, chunk_spec: ArraySpec) -> CodecOutput | None: - if chunk is None: - return None - return await func(chunk, chunk_spec) - - return wrap - - -class CodecBatchMixin(Generic[CodecInput, CodecOutput]): - """The default interface from the Codec class expects batches of codecs. - However, many codec implementation operate on single codecs. - This mixin provides abstract methods for decode_single and encode_single and - implements batching through concurrent processing. - - Use ArrayArrayCodecBatchMixin, ArrayBytesCodecBatchMixin and BytesBytesCodecBatchMixin - for subclassing. - """ - - @abstractmethod - async def decode_single(self, chunk_data: CodecOutput, chunk_spec: ArraySpec) -> CodecInput: - pass - - async def decode( - self, chunk_data_and_specs: Iterable[tuple[CodecOutput | None, ArraySpec]] - ) -> Iterable[CodecInput | None]: - return await batching_helper(self.decode_single, chunk_data_and_specs) - - @abstractmethod - async def encode_single( - self, chunk_data: CodecInput, chunk_spec: ArraySpec - ) -> CodecOutput | None: - pass - - async def encode( - self, chunk_data_and_specs: Iterable[tuple[CodecInput | None, ArraySpec]] - ) -> Iterable[CodecOutput | None]: - return await batching_helper(self.encode_single, chunk_data_and_specs) - - -class ArrayArrayCodecBatchMixin(CodecBatchMixin[NDBuffer, NDBuffer], ArrayArrayCodec): - pass - - -class ArrayBytesCodecBatchMixin(CodecBatchMixin[NDBuffer, Buffer], ArrayBytesCodec): - pass - - -class BytesBytesCodecBatchMixin(CodecBatchMixin[Buffer, Buffer], BytesBytesCodec): - pass - - -class ArrayBytesCodecPartialDecodeBatchMixin(ArrayBytesCodecPartialDecodeMixin): - @abstractmethod - async def decode_partial_single( - self, byte_getter: ByteGetter, selection: SliceSelection, chunk_spec: ArraySpec - ) -> NDBuffer | None: - pass - - async def decode_partial( - self, batch_info: Iterable[tuple[ByteGetter, SliceSelection, ArraySpec]] - ) -> Iterable[NDBuffer | None]: - return await concurrent_map( - [ - (byte_getter, selection, chunk_spec) - for byte_getter, selection, chunk_spec in batch_info - ], - self.decode_partial_single, - config.get("async.concurrency"), - ) - - -class ArrayBytesCodecPartialEncodeBatchMixin(ArrayBytesCodecPartialEncodeMixin): - @abstractmethod - async def encode_partial_single( - self, - byte_setter: ByteSetter, - chunk_array: NDBuffer, - selection: SliceSelection, - chunk_spec: ArraySpec, - ) -> None: - pass - - async def encode_partial( - self, batch_info: Iterable[tuple[ByteSetter, NDBuffer, SliceSelection, ArraySpec]] - ) -> None: - await concurrent_map( - [ - (byte_setter, chunk_array, selection, chunk_spec) - for byte_setter, chunk_array, selection, chunk_spec in batch_info - ], - self.encode_partial_single, - config.get("async.concurrency"), - ) diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index dd7cdcd0b4..131c56df05 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -6,14 +6,17 @@ import numpy as np -from zarr.abc.codec import ByteGetter, ByteSetter, Codec, CodecPipeline +from zarr.abc.codec import ( + ArrayBytesCodec, + ArrayBytesCodecPartialDecodeMixin, + ArrayBytesCodecPartialEncodeMixin, + ByteGetter, + ByteSetter, + Codec, + CodecPipeline, +) from zarr.codecs.bytes import BytesCodec from zarr.codecs.crc32c_ import Crc32cCodec -from zarr.codecs.mixins import ( - ArrayBytesCodecBatchMixin, - ArrayBytesCodecPartialDecodeBatchMixin, - ArrayBytesCodecPartialEncodeBatchMixin, -) from zarr.codecs.pipeline import BatchedCodecPipeline from zarr.codecs.registry import register_codec from zarr.common import ( @@ -286,9 +289,7 @@ async def finalize( @dataclass(frozen=True) class ShardingCodec( - ArrayBytesCodecBatchMixin, - ArrayBytesCodecPartialDecodeBatchMixin, - ArrayBytesCodecPartialEncodeBatchMixin, + ArrayBytesCodec, ArrayBytesCodecPartialDecodeMixin, ArrayBytesCodecPartialEncodeMixin ): chunk_shape: ChunkCoords codecs: CodecPipeline @@ -373,7 +374,7 @@ def validate(self, array_metadata: ArrayMetadata) -> None: + "shard's inner `chunk_shape`." ) - async def decode_single( + async def _decode_single( self, shard_bytes: Buffer, shard_spec: ArraySpec, @@ -415,7 +416,7 @@ async def decode_single( return out - async def decode_partial_single( + async def _decode_partial_single( self, byte_getter: ByteGetter, selection: SliceSelection, @@ -476,7 +477,7 @@ async def decode_partial_single( ) return out - async def encode_single( + async def _encode_single( self, shard_array: NDBuffer, shard_spec: ArraySpec, @@ -511,7 +512,7 @@ async def encode_single( return await shard_builder.finalize(self.index_location, self._encode_shard_index) - async def encode_partial_single( + async def _encode_partial_single( self, byte_setter: ByteSetter, shard_array: NDBuffer, diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index 5d4d2a7b84..4e975b1edc 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -3,7 +3,7 @@ from dataclasses import dataclass, replace -from zarr.codecs.mixins import ArrayArrayCodecBatchMixin +from zarr.abc.codec import ArrayArrayCodec from zarr.buffer import NDBuffer from zarr.common import JSON, ArraySpec, ChunkCoordsLike, parse_named_configuration from zarr.codecs.registry import register_codec @@ -22,7 +22,7 @@ def parse_transpose_order(data: Union[JSON, Iterable[int]]) -> Tuple[int, ...]: @dataclass(frozen=True) -class TransposeCodec(ArrayArrayCodecBatchMixin): +class TransposeCodec(ArrayArrayCodec): is_fixed_size = True order: Tuple[int, ...] @@ -71,7 +71,7 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: order=chunk_spec.order, ) - async def decode_single( + async def _decode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -82,7 +82,7 @@ async def decode_single( chunk_array = chunk_array.transpose(inverse_order) return chunk_array - async def encode_single( + async def _encode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, diff --git a/src/zarr/codecs/zstd.py b/src/zarr/codecs/zstd.py index 4422188d25..fa9fd47494 100644 --- a/src/zarr/codecs/zstd.py +++ b/src/zarr/codecs/zstd.py @@ -5,7 +5,7 @@ from zstandard import ZstdCompressor, ZstdDecompressor -from zarr.codecs.mixins import BytesBytesCodecBatchMixin +from zarr.abc.codec import BytesBytesCodec from zarr.buffer import Buffer, as_numpy_array_wrapper from zarr.codecs.registry import register_codec from zarr.common import parse_named_configuration, to_thread @@ -31,7 +31,7 @@ def parse_checksum(data: JSON) -> bool: @dataclass(frozen=True) -class ZstdCodec(BytesBytesCodecBatchMixin): +class ZstdCodec(BytesBytesCodec): is_fixed_size = True level: int = 0 @@ -60,14 +60,14 @@ def _decompress(self, data: bytes) -> bytes: ctx = ZstdDecompressor() return ctx.decompress(data) - async def decode_single( + async def _decode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, ) -> Buffer: return await to_thread(as_numpy_array_wrapper, self._decompress, chunk_bytes) - async def encode_single( + async def _encode_single( self, chunk_bytes: Buffer, _chunk_spec: ArraySpec, From 82ee4be75094f0d4144be15cc69524f3cd5dca2c Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Fri, 17 May 2024 09:36:22 +0200 Subject: [PATCH 2/3] evolve -> evolve_from_array_spec --- src/zarr/abc/codec.py | 4 ++-- src/zarr/codecs/blosc.py | 2 +- src/zarr/codecs/bytes.py | 2 +- src/zarr/codecs/pipeline.py | 4 ++-- src/zarr/codecs/sharding.py | 4 ++-- src/zarr/codecs/transpose.py | 2 +- src/zarr/metadata.py | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index 4bd046eaee..0c9d1cbcf4 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -60,7 +60,7 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: """ return chunk_spec - def evolve(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: """Fills in codec configuration parameters that can be automatically inferred from the array metadata. @@ -236,7 +236,7 @@ class CodecPipeline(Metadata): and writes them to a store (via ByteSetter).""" @abstractmethod - def evolve(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: """Fills in codec configuration parameters that can be automatically inferred from the array metadata. diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 443e5969db..e9df790f23 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -125,7 +125,7 @@ def to_dict(self) -> Dict[str, JSON]: }, } - def evolve(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: new_codec = self if new_codec.typesize is None: new_codec = replace(new_codec, typesize=array_spec.dtype.itemsize) diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 00a7f78121..d36ad144d4 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -50,7 +50,7 @@ def to_dict(self) -> Dict[str, JSON]: else: return {"name": "bytes", "configuration": {"endian": self.endian}} - def evolve(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: if array_spec.dtype.itemsize == 0: if self.endian is not None: return replace(self, endian=None) diff --git a/src/zarr/codecs/pipeline.py b/src/zarr/codecs/pipeline.py index 8396a0c2ce..dd8caabce0 100644 --- a/src/zarr/codecs/pipeline.py +++ b/src/zarr/codecs/pipeline.py @@ -85,8 +85,8 @@ def from_dict(cls, data: Iterable[JSON | Codec], *, batch_size: int | None = Non def to_dict(self) -> JSON: return [c.to_dict() for c in self] - def evolve(self, array_spec: ArraySpec) -> Self: - return type(self).from_list([c.evolve(array_spec) for c in self]) + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + return type(self).from_list([c.evolve_from_array_spec(array_spec) for c in self]) @staticmethod def codecs_from_list( diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 131c56df05..0274678b36 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -347,9 +347,9 @@ def to_dict(self) -> Dict[str, JSON]: }, } - def evolve(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: shard_spec = self._get_chunk_spec(array_spec) - evolved_codecs = self.codecs.evolve(shard_spec) + evolved_codecs = self.codecs.evolve_from_array_spec(shard_spec) if evolved_codecs != self.codecs: return replace(self, codecs=evolved_codecs) return self diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index 4e975b1edc..0124304f0f 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -40,7 +40,7 @@ def from_dict(cls, data: Dict[str, JSON]) -> Self: def to_dict(self) -> Dict[str, JSON]: return {"name": "transpose", "configuration": {"order": list(self.order)}} - def evolve(self, array_spec: ArraySpec) -> Self: + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: if len(self.order) != array_spec.ndim: raise ValueError( "The `order` tuple needs have as many entries as " diff --git a/src/zarr/metadata.py b/src/zarr/metadata.py index 695d83da55..9f4a659fb0 100644 --- a/src/zarr/metadata.py +++ b/src/zarr/metadata.py @@ -192,7 +192,7 @@ def __init__( fill_value=fill_value_parsed, order="C", # TODO: order is not needed here. ) - codecs_parsed = parse_codecs(codecs).evolve(array_spec) + codecs_parsed = parse_codecs(codecs).evolve_from_array_spec(array_spec) object.__setattr__(self, "shape", shape_parsed) object.__setattr__(self, "data_type", data_type_parsed) From 304141d0b11c2a0a6912ac0aa76797e843aea8ac Mon Sep 17 00:00:00 2001 From: Norman Rzepka Date: Fri, 17 May 2024 09:56:38 +0200 Subject: [PATCH 3/3] use as_ndarray_like --- src/zarr/codecs/_v2.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/zarr/codecs/_v2.py b/src/zarr/codecs/_v2.py index 94c7f1fa7c..a683f95aba 100644 --- a/src/zarr/codecs/_v2.py +++ b/src/zarr/codecs/_v2.py @@ -74,34 +74,34 @@ async def _decode_single( chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> NDBuffer: - chunk_numpy_array = chunk_array.as_numpy_array() + chunk_ndarray = chunk_array.as_ndarray_like() # apply filters in reverse order if self.filters is not None: for filter_metadata in self.filters[::-1]: filter = numcodecs.get_codec(filter_metadata) - chunk_numpy_array = await to_thread(filter.decode, chunk_numpy_array) + chunk_ndarray = await to_thread(filter.decode, chunk_ndarray) # ensure correct chunk shape - if chunk_numpy_array.shape != chunk_spec.shape: - chunk_numpy_array = chunk_numpy_array.reshape( + if chunk_ndarray.shape != chunk_spec.shape: + chunk_ndarray = chunk_ndarray.reshape( chunk_spec.shape, order=chunk_spec.order, ) - return NDBuffer.from_numpy_array(chunk_numpy_array) + return NDBuffer.from_ndarray_like(chunk_ndarray) async def _encode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> NDBuffer | None: - chunk_numpy_array = chunk_array.as_numpy_array().ravel(order=chunk_spec.order) + chunk_ndarray = chunk_array.as_ndarray_like().ravel(order=chunk_spec.order) for filter_metadata in self.filters: filter = numcodecs.get_codec(filter_metadata) - chunk_numpy_array = await to_thread(filter.encode, chunk_numpy_array) + chunk_ndarray = await to_thread(filter.encode, chunk_ndarray) - return NDBuffer.from_numpy_array(chunk_numpy_array) + return NDBuffer.from_ndarray_like(chunk_ndarray) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError