Skip to content

Commit ae0580c

Browse files
committed
fixup
1 parent 9e25150 commit ae0580c

4 files changed

Lines changed: 32 additions & 246 deletions

File tree

src/zarr/abc/codec.py

Lines changed: 10 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
if TYPE_CHECKING:
1616
from collections.abc import Awaitable, Callable, Iterable
17-
from typing import Any, Self
17+
from typing import Self
1818

1919
from zarr.abc.store import ByteGetter, ByteSetter, Store
2020
from zarr.core.array_spec import ArraySpec
@@ -36,7 +36,6 @@
3636
"GetResult",
3737
"PreparedWrite",
3838
"SupportsChunkCodec",
39-
"SupportsChunkMapping",
4039
"SupportsSyncCodec",
4140
]
4241

@@ -89,117 +88,20 @@ def _encode_sync(self, chunk_data: CI, chunk_spec: ArraySpec) -> CO | None: ...
8988
class SupportsChunkCodec(Protocol):
9089
"""Protocol for objects that can decode/encode whole chunks synchronously.
9190
92-
`ChunkTransform` satisfies this protocol.
91+
`ChunkTransform` satisfies this protocol. The ``chunk_shape`` parameter
92+
allows decoding/encoding chunks of different shapes (e.g. rectilinear
93+
grids) without rebuilding the transform.
9394
"""
9495

9596
array_spec: ArraySpec
9697

97-
def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ...
98+
def decode_chunk(
99+
self, chunk_bytes: Buffer, chunk_shape: tuple[int, ...] | None = None
100+
) -> NDBuffer: ...
98101

99-
def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ...
100-
101-
102-
@runtime_checkable
103-
class SupportsChunkMapping(Protocol):
104-
"""Protocol for codecs that expose their stored data as a mapping
105-
from chunk coordinates to encoded buffers.
106-
107-
A single store key holds a blob. This protocol defines how to
108-
interpret that blob as a ``dict[tuple[int, ...], Buffer | None]`` —
109-
a mapping from inner-chunk coordinates to their encoded bytes.
110-
111-
For a non-sharded codec (``BytesCodec``), the mapping is trivial:
112-
one entry at ``(0,)`` containing the entire blob. For a sharded
113-
codec, the mapping has one entry per inner chunk, derived from the
114-
shard index embedded in the blob. The pipeline doesn't need to know
115-
which case it's dealing with — it operates on the mapping uniformly.
116-
117-
This abstraction enables the three-phase IO/compute/IO pattern:
118-
119-
1. **IO**: fetch the blob from the store.
120-
2. **Compute**: unpack the blob into the chunk mapping, decode/merge/
121-
re-encode entries, pack back into a blob. All pure compute.
122-
3. **IO**: write the blob to the store.
123-
"""
124-
125-
@property
126-
def inner_codec_chain(self) -> SupportsChunkCodec | None:
127-
"""The codec chain for inner chunks, or `None` to use the pipeline's."""
128-
...
129-
130-
def unpack_chunks(
131-
self,
132-
raw: Buffer | None,
133-
chunk_spec: ArraySpec,
134-
) -> dict[tuple[int, ...], Buffer | None]:
135-
"""Unpack a storage blob into per-inner-chunk encoded buffers."""
136-
...
137-
138-
def pack_chunks(
139-
self,
140-
chunk_dict: dict[tuple[int, ...], Buffer | None],
141-
chunk_spec: ArraySpec,
142-
) -> Buffer | None:
143-
"""Pack per-inner-chunk encoded buffers into a single storage blob."""
144-
...
145-
146-
def prepare_read_sync(
147-
self,
148-
byte_getter: Any,
149-
chunk_selection: SelectorTuple,
150-
codec_chain: SupportsChunkCodec,
151-
) -> NDBuffer | None:
152-
"""Fetch and decode a chunk synchronously, returning the selected region."""
153-
...
154-
155-
def prepare_write_sync(
156-
self,
157-
byte_setter: Any,
158-
codec_chain: SupportsChunkCodec,
159-
chunk_selection: SelectorTuple,
160-
out_selection: SelectorTuple,
161-
replace: bool,
162-
) -> PreparedWrite:
163-
"""Prepare a synchronous write: fetch existing data if needed, unpack."""
164-
...
165-
166-
def finalize_write_sync(
167-
self,
168-
prepared: PreparedWrite,
169-
chunk_spec: ArraySpec,
170-
byte_setter: Any,
171-
) -> None:
172-
"""Pack the prepared chunk data and write it to the store."""
173-
...
174-
175-
async def prepare_read(
176-
self,
177-
byte_getter: Any,
178-
chunk_selection: SelectorTuple,
179-
codec_chain: SupportsChunkCodec,
180-
) -> NDBuffer | None:
181-
"""Async variant of `prepare_read_sync`."""
182-
...
183-
184-
async def prepare_write(
185-
self,
186-
byte_setter: Any,
187-
codec_chain: SupportsChunkCodec,
188-
chunk_selection: SelectorTuple,
189-
out_selection: SelectorTuple,
190-
replace: bool,
191-
) -> PreparedWrite:
192-
"""Async variant of `prepare_write_sync`."""
193-
...
194-
195-
async def finalize_write(
196-
self,
197-
prepared: PreparedWrite,
198-
chunk_spec: ArraySpec,
199-
byte_setter: Any,
200-
) -> None:
201-
"""Async variant of `finalize_write_sync`."""
202-
...
102+
def encode_chunk(
103+
self, chunk_array: NDBuffer, chunk_shape: tuple[int, ...] | None = None
104+
) -> Buffer | None: ...
203105

204106

205107
class BaseCodec[CI: CodecInput, CO: CodecOutput](Metadata):

src/zarr/codecs/bytes.py

Lines changed: 2 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@
55
from enum import Enum
66
from typing import TYPE_CHECKING
77

8-
from zarr.abc.codec import ArrayBytesCodec, PreparedWrite, SupportsChunkCodec
8+
from zarr.abc.codec import ArrayBytesCodec
99
from zarr.core.buffer import Buffer, NDBuffer
1010
from zarr.core.common import JSON, parse_enum, parse_named_configuration
1111
from zarr.core.dtype.common import HasEndianness
1212

1313
if TYPE_CHECKING:
14-
from typing import Any, Self
14+
from typing import Self
1515

1616
from zarr.core.array_spec import ArraySpec
17-
from zarr.core.indexing import SelectorTuple
1817

1918

2019
class Endian(Enum):
@@ -126,114 +125,3 @@ async def _encode_single(
126125

127126
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
128127
return input_byte_length
129-
130-
# -- SupportsChunkMapping --
131-
132-
@property
133-
def inner_codec_chain(self) -> SupportsChunkCodec | None:
134-
"""Returns `None` — the pipeline should use its own codec chain."""
135-
return None
136-
137-
def unpack_chunks(
138-
self,
139-
raw: Buffer | None,
140-
chunk_spec: ArraySpec,
141-
) -> dict[tuple[int, ...], Buffer | None]:
142-
"""Single chunk keyed at `(0,)`."""
143-
return {(0,): raw}
144-
145-
def pack_chunks(
146-
self,
147-
chunk_dict: dict[tuple[int, ...], Buffer | None],
148-
chunk_spec: ArraySpec,
149-
) -> Buffer | None:
150-
"""Return the single chunk's bytes."""
151-
return chunk_dict.get((0,))
152-
153-
def prepare_read_sync(
154-
self,
155-
byte_getter: Any,
156-
chunk_selection: SelectorTuple,
157-
codec_chain: SupportsChunkCodec,
158-
) -> NDBuffer | None:
159-
"""Fetch, decode, and return the selected region synchronously."""
160-
raw = byte_getter.get_sync(prototype=codec_chain.array_spec.prototype)
161-
if raw is None:
162-
return None
163-
chunk_array = codec_chain.decode_chunk(raw)
164-
return chunk_array[chunk_selection]
165-
166-
def prepare_write_sync(
167-
self,
168-
byte_setter: Any,
169-
codec_chain: SupportsChunkCodec,
170-
chunk_selection: SelectorTuple,
171-
out_selection: SelectorTuple,
172-
replace: bool,
173-
) -> PreparedWrite:
174-
"""Fetch existing data if needed, unpack, return `PreparedWrite`."""
175-
from zarr.core.indexing import ChunkProjection
176-
177-
existing: Buffer | None = None
178-
if not replace:
179-
existing = byte_setter.get_sync(prototype=codec_chain.array_spec.prototype)
180-
chunk_dict = self.unpack_chunks(existing, codec_chain.array_spec)
181-
indexer = [ChunkProjection((0,), chunk_selection, out_selection, replace)] # type: ignore[arg-type]
182-
return PreparedWrite(chunk_dict=chunk_dict, indexer=indexer)
183-
184-
def finalize_write_sync(
185-
self,
186-
prepared: PreparedWrite,
187-
chunk_spec: ArraySpec,
188-
byte_setter: Any,
189-
) -> None:
190-
"""Pack and write to store, or delete if empty."""
191-
blob = self.pack_chunks(prepared.chunk_dict, chunk_spec)
192-
if blob is None:
193-
byte_setter.delete_sync()
194-
else:
195-
byte_setter.set_sync(blob)
196-
197-
async def prepare_read(
198-
self,
199-
byte_getter: Any,
200-
chunk_selection: SelectorTuple,
201-
codec_chain: SupportsChunkCodec,
202-
) -> NDBuffer | None:
203-
"""Async variant of `prepare_read_sync`."""
204-
raw = await byte_getter.get(prototype=codec_chain.array_spec.prototype)
205-
if raw is None:
206-
return None
207-
chunk_array = codec_chain.decode_chunk(raw)
208-
return chunk_array[chunk_selection]
209-
210-
async def prepare_write(
211-
self,
212-
byte_setter: Any,
213-
codec_chain: SupportsChunkCodec,
214-
chunk_selection: SelectorTuple,
215-
out_selection: SelectorTuple,
216-
replace: bool,
217-
) -> PreparedWrite:
218-
"""Async variant of `prepare_write_sync`."""
219-
from zarr.core.indexing import ChunkProjection
220-
221-
existing: Buffer | None = None
222-
if not replace:
223-
existing = await byte_setter.get(prototype=codec_chain.array_spec.prototype)
224-
chunk_dict = self.unpack_chunks(existing, codec_chain.array_spec)
225-
indexer = [ChunkProjection((0,), chunk_selection, out_selection, replace)] # type: ignore[arg-type]
226-
return PreparedWrite(chunk_dict=chunk_dict, indexer=indexer)
227-
228-
async def finalize_write(
229-
self,
230-
prepared: PreparedWrite,
231-
chunk_spec: ArraySpec,
232-
byte_setter: Any,
233-
) -> None:
234-
"""Async variant of `finalize_write_sync`."""
235-
blob = self.pack_chunks(prepared.chunk_dict, chunk_spec)
236-
if blob is None:
237-
await byte_setter.delete()
238-
else:
239-
await byte_setter.set(blob)

src/zarr/core/array.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -229,29 +229,23 @@ def create_codec_pipeline(metadata: ArrayMetadata, *, store: Store | None = None
229229

230230
if isinstance(metadata, ArrayV3Metadata):
231231
pipeline = get_pipeline_class().from_codecs(metadata.codecs)
232-
# PhasedCodecPipeline needs evolve_from_array_spec to build its
233-
# ChunkTransform and ShardLayout. BatchedCodecPipeline does not.
234-
if hasattr(pipeline, "chunk_transform") and pipeline.chunk_transform is None:
235-
from zarr.core.metadata.v3 import RegularChunkGridMetadata
236-
237-
# Use the regular chunk shape if available, otherwise use a
238-
# placeholder shape. The ChunkTransform is shape-agnostic —
239-
# the actual chunk shape is passed per-call at decode/encode time.
240-
if isinstance(metadata.chunk_grid, RegularChunkGridMetadata):
241-
chunk_shape = metadata.chunk_grid.chunk_shape
242-
else:
243-
# Rectilinear: use a 1-element shape per dimension as placeholder.
244-
# Only dtype/fill_value/config matter for codec evolution.
245-
chunk_shape = (1,) * len(metadata.shape)
246-
chunk_spec = ArraySpec(
247-
shape=chunk_shape,
248-
dtype=metadata.data_type,
249-
fill_value=metadata.fill_value,
250-
config=ArrayConfig.from_dict({}),
251-
prototype=default_buffer_prototype(),
252-
)
253-
pipeline = pipeline.evolve_from_array_spec(chunk_spec)
254-
return pipeline
232+
from zarr.core.metadata.v3 import RegularChunkGridMetadata
233+
234+
# Use the regular chunk shape if available, otherwise use a
235+
# placeholder. The ChunkTransform is shape-agnostic — the actual
236+
# chunk shape is passed per-call at decode/encode time.
237+
if isinstance(metadata.chunk_grid, RegularChunkGridMetadata):
238+
chunk_shape = metadata.chunk_grid.chunk_shape
239+
else:
240+
chunk_shape = (1,) * len(metadata.shape)
241+
chunk_spec = ArraySpec(
242+
shape=chunk_shape,
243+
dtype=metadata.data_type,
244+
fill_value=metadata.fill_value,
245+
config=ArrayConfig.from_dict({}),
246+
prototype=default_buffer_prototype(),
247+
)
248+
return pipeline.evolve_from_array_spec(chunk_spec)
255249
elif isinstance(metadata, ArrayV2Metadata):
256250
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
257251
return get_pipeline_class().from_codecs([v2_codec])

src/zarr/core/codec_pipeline.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,11 +656,13 @@ def codecs_from_list(
656656
) -> tuple[tuple[ArrayArrayCodec, ...], ArrayBytesCodec, tuple[BytesBytesCodec, ...]]:
657657
from zarr.codecs.sharding import ShardingCodec
658658

659+
codecs = tuple(codecs) # materialize to avoid generator consumption issues
660+
659661
array_array: tuple[ArrayArrayCodec, ...] = ()
660662
array_bytes_maybe: ArrayBytesCodec | None = None
661663
bytes_bytes: tuple[BytesBytesCodec, ...] = ()
662664

663-
if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(tuple(codecs)) > 1:
665+
if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1:
664666
warn(
665667
"Combining a `sharding_indexed` codec disables partial reads and "
666668
"writes, which may lead to inefficient performance.",

0 commit comments

Comments
 (0)