Skip to content

Commit a75604a

Browse files
committed
feat: define PreparedWrite and SupportsChunkPacking data structures
`PreparedWrite` models a set of per-chunk changes that would be applied to a stored chunk. `SupportsChunkPacking` is a protocol for array -> bytes codecs that can use `PreparedWrite` objects to update an existing chunk.
1 parent ca9bd3e commit a75604a

4 files changed

Lines changed: 266 additions & 9 deletions

File tree

src/zarr/abc/codec.py

Lines changed: 147 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from abc import abstractmethod
44
from collections.abc import Mapping
5+
from dataclasses import dataclass
56
from typing import TYPE_CHECKING, Literal, Protocol, TypeGuard, runtime_checkable
67

78
from typing_extensions import ReadOnly, TypedDict
@@ -13,13 +14,13 @@
1314

1415
if TYPE_CHECKING:
1516
from collections.abc import Awaitable, Callable, Iterable
16-
from typing import Self
17+
from typing import Any, Self
1718

1819
from zarr.abc.store import ByteGetter, ByteSetter, Store
1920
from zarr.core.array_spec import ArraySpec
2021
from zarr.core.chunk_grids import ChunkGrid
2122
from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType
22-
from zarr.core.indexing import SelectorTuple
23+
from zarr.core.indexing import ChunkProjection, SelectorTuple
2324
from zarr.core.metadata import ArrayMetadata
2425

2526
__all__ = [
@@ -33,6 +34,9 @@
3334
"CodecOutput",
3435
"CodecPipeline",
3536
"GetResult",
37+
"PreparedWrite",
38+
"SupportsChunkCodec",
39+
"SupportsChunkPacking",
3640
"SupportsSyncCodec",
3741
]
3842

@@ -82,6 +86,116 @@ def _decode_sync(self, chunk_data: CO, chunk_spec: ArraySpec) -> CI: ...
8286
def _encode_sync(self, chunk_data: CI, chunk_spec: ArraySpec) -> CO | None: ...
8387

8488

89+
class SupportsChunkCodec(Protocol):
90+
"""Protocol for objects that can decode/encode whole chunks synchronously.
91+
92+
`ChunkTransform` satisfies this protocol.
93+
"""
94+
95+
array_spec: ArraySpec
96+
97+
def decode_chunk(self, chunk_bytes: Buffer) -> NDBuffer: ...
98+
99+
def encode_chunk(self, chunk_array: NDBuffer) -> Buffer | None: ...
100+
101+
102+
class SupportsChunkPacking(Protocol):
103+
"""Protocol for codecs that can pack/unpack inner chunks into a storage blob
104+
and manage the prepare/finalize IO lifecycle.
105+
106+
`BytesCodec` and `ShardingCodec` implement this protocol. The pipeline
107+
uses it to separate IO (prepare/finalize) from compute (encode/decode),
108+
enabling the compute phase to run in a thread pool.
109+
110+
The lifecycle is:
111+
112+
1. **Prepare**: fetch existing bytes from the store (if partial write),
113+
unpack into per-inner-chunk buffers → `PreparedWrite`
114+
2. **Compute**: iterate `PreparedWrite.indexer`, decode each inner chunk,
115+
merge new data, re-encode, update `PreparedWrite.chunk_dict`
116+
3. **Finalize**: pack `chunk_dict` back into a blob and write to store
117+
"""
118+
119+
@property
120+
def inner_codec_chain(self) -> SupportsChunkCodec | None:
121+
"""The codec chain for inner chunks, or `None` to use the pipeline's."""
122+
...
123+
124+
def unpack_chunks(
125+
self,
126+
raw: Buffer | None,
127+
chunk_spec: ArraySpec,
128+
) -> dict[tuple[int, ...], Buffer | None]:
129+
"""Unpack a storage blob into per-inner-chunk encoded buffers."""
130+
...
131+
132+
def pack_chunks(
133+
self,
134+
chunk_dict: dict[tuple[int, ...], Buffer | None],
135+
chunk_spec: ArraySpec,
136+
) -> Buffer | None:
137+
"""Pack per-inner-chunk encoded buffers into a single storage blob."""
138+
...
139+
140+
def prepare_read_sync(
141+
self,
142+
byte_getter: Any,
143+
chunk_selection: SelectorTuple,
144+
codec_chain: SupportsChunkCodec,
145+
) -> NDBuffer | None:
146+
"""Fetch and decode a chunk synchronously, returning the selected region."""
147+
...
148+
149+
def prepare_write_sync(
150+
self,
151+
byte_setter: Any,
152+
codec_chain: SupportsChunkCodec,
153+
chunk_selection: SelectorTuple,
154+
out_selection: SelectorTuple,
155+
replace: bool,
156+
) -> PreparedWrite:
157+
"""Prepare a synchronous write: fetch existing data if needed, unpack."""
158+
...
159+
160+
def finalize_write_sync(
161+
self,
162+
prepared: PreparedWrite,
163+
chunk_spec: ArraySpec,
164+
byte_setter: Any,
165+
) -> None:
166+
"""Pack the prepared chunk data and write it to the store."""
167+
...
168+
169+
async def prepare_read(
170+
self,
171+
byte_getter: Any,
172+
chunk_selection: SelectorTuple,
173+
codec_chain: SupportsChunkCodec,
174+
) -> NDBuffer | None:
175+
"""Async variant of `prepare_read_sync`."""
176+
...
177+
178+
async def prepare_write(
179+
self,
180+
byte_setter: Any,
181+
codec_chain: SupportsChunkCodec,
182+
chunk_selection: SelectorTuple,
183+
out_selection: SelectorTuple,
184+
replace: bool,
185+
) -> PreparedWrite:
186+
"""Async variant of `prepare_write_sync`."""
187+
...
188+
189+
async def finalize_write(
190+
self,
191+
prepared: PreparedWrite,
192+
chunk_spec: ArraySpec,
193+
byte_setter: Any,
194+
) -> None:
195+
"""Async variant of `finalize_write_sync`."""
196+
...
197+
198+
85199
class BaseCodec[CI: CodecInput, CO: CodecOutput](Metadata):
86200
"""Generic base class for codecs.
87201
@@ -207,6 +321,37 @@ class ArrayArrayCodec(BaseCodec[NDBuffer, NDBuffer]):
207321
"""Base class for array-to-array codecs."""
208322

209323

324+
@dataclass
325+
class PreparedWrite:
326+
"""Intermediate state between reading existing data and writing new data.
327+
328+
Created by `prepare_write_sync` / `prepare_write`, consumed by
329+
`finalize_write_sync` / `finalize_write`. The compute phase sits
330+
in between: iterate over `indexer`, decode the corresponding entry
331+
in `chunk_dict`, merge new data, re-encode, and store the result
332+
back into `chunk_dict`.
333+
334+
Attributes
335+
----------
336+
chunk_dict : dict[tuple[int, ...], Buffer | None]
337+
Per-inner-chunk encoded bytes, keyed by chunk coordinates.
338+
For a regular array this is `{(0,): <bytes>}`. For a sharded
339+
array it contains one entry per inner chunk in the shard,
340+
including chunks not being modified (they pass through
341+
unchanged). `None` means the chunk did not exist on disk.
342+
indexer : list[ChunkProjection]
343+
The inner chunks to modify. Each entry's `chunk_coords`
344+
corresponds to a key in `chunk_dict`. `chunk_selection`
345+
identifies the region within that inner chunk, and
346+
`out_selection` identifies the corresponding region in the
347+
source value array. This is a subset of `chunk_dict`'s keys
348+
— untouched chunks are not listed.
349+
"""
350+
351+
chunk_dict: dict[tuple[int, ...], Buffer | None]
352+
indexer: list[ChunkProjection]
353+
354+
210355
class ArrayBytesCodec(BaseCodec[NDBuffer, Buffer]):
211356
"""Base class for array-to-bytes codecs."""
212357

src/zarr/codecs/bytes.py

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
from enum import Enum
66
from typing import TYPE_CHECKING
77

8-
from zarr.abc.codec import ArrayBytesCodec
8+
from zarr.abc.codec import ArrayBytesCodec, PreparedWrite, SupportsChunkCodec
99
from zarr.core.buffer import Buffer, NDBuffer
1010
from zarr.core.common import JSON, parse_enum, parse_named_configuration
1111
from zarr.core.dtype.common import HasEndianness
1212

1313
if TYPE_CHECKING:
14-
from typing import Self
14+
from typing import Any, Self
1515

1616
from zarr.core.array_spec import ArraySpec
17+
from zarr.core.indexing import SelectorTuple
1718

1819

1920
class Endian(Enum):
@@ -125,3 +126,114 @@ async def _encode_single(
125126

126127
def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
127128
return input_byte_length
129+
130+
# -- SupportsChunkPacking --
131+
132+
@property
133+
def inner_codec_chain(self) -> SupportsChunkCodec | None:
134+
"""Returns `None` — the pipeline should use its own codec chain."""
135+
return None
136+
137+
def unpack_chunks(
138+
self,
139+
raw: Buffer | None,
140+
chunk_spec: ArraySpec,
141+
) -> dict[tuple[int, ...], Buffer | None]:
142+
"""Single chunk keyed at `(0,)`."""
143+
return {(0,): raw}
144+
145+
def pack_chunks(
146+
self,
147+
chunk_dict: dict[tuple[int, ...], Buffer | None],
148+
chunk_spec: ArraySpec,
149+
) -> Buffer | None:
150+
"""Return the single chunk's bytes."""
151+
return chunk_dict.get((0,))
152+
153+
def prepare_read_sync(
154+
self,
155+
byte_getter: Any,
156+
chunk_selection: SelectorTuple,
157+
codec_chain: SupportsChunkCodec,
158+
) -> NDBuffer | None:
159+
"""Fetch, decode, and return the selected region synchronously."""
160+
raw = byte_getter.get_sync(prototype=codec_chain.array_spec.prototype)
161+
if raw is None:
162+
return None
163+
chunk_array = codec_chain.decode_chunk(raw)
164+
return chunk_array[chunk_selection]
165+
166+
def prepare_write_sync(
167+
self,
168+
byte_setter: Any,
169+
codec_chain: SupportsChunkCodec,
170+
chunk_selection: SelectorTuple,
171+
out_selection: SelectorTuple,
172+
replace: bool,
173+
) -> PreparedWrite:
174+
"""Fetch existing data if needed, unpack, return `PreparedWrite`."""
175+
from zarr.core.indexing import ChunkProjection
176+
177+
existing: Buffer | None = None
178+
if not replace:
179+
existing = byte_setter.get_sync(prototype=codec_chain.array_spec.prototype)
180+
chunk_dict = self.unpack_chunks(existing, codec_chain.array_spec)
181+
indexer = [ChunkProjection((0,), chunk_selection, out_selection, replace)] # type: ignore[arg-type]
182+
return PreparedWrite(chunk_dict=chunk_dict, indexer=indexer)
183+
184+
def finalize_write_sync(
185+
self,
186+
prepared: PreparedWrite,
187+
chunk_spec: ArraySpec,
188+
byte_setter: Any,
189+
) -> None:
190+
"""Pack and write to store, or delete if empty."""
191+
blob = self.pack_chunks(prepared.chunk_dict, chunk_spec)
192+
if blob is None:
193+
byte_setter.delete_sync()
194+
else:
195+
byte_setter.set_sync(blob)
196+
197+
async def prepare_read(
198+
self,
199+
byte_getter: Any,
200+
chunk_selection: SelectorTuple,
201+
codec_chain: SupportsChunkCodec,
202+
) -> NDBuffer | None:
203+
"""Async variant of `prepare_read_sync`."""
204+
raw = await byte_getter.get(prototype=codec_chain.array_spec.prototype)
205+
if raw is None:
206+
return None
207+
chunk_array = codec_chain.decode_chunk(raw)
208+
return chunk_array[chunk_selection]
209+
210+
async def prepare_write(
211+
self,
212+
byte_setter: Any,
213+
codec_chain: SupportsChunkCodec,
214+
chunk_selection: SelectorTuple,
215+
out_selection: SelectorTuple,
216+
replace: bool,
217+
) -> PreparedWrite:
218+
"""Async variant of `prepare_write_sync`."""
219+
from zarr.core.indexing import ChunkProjection
220+
221+
existing: Buffer | None = None
222+
if not replace:
223+
existing = await byte_setter.get(prototype=codec_chain.array_spec.prototype)
224+
chunk_dict = self.unpack_chunks(existing, codec_chain.array_spec)
225+
indexer = [ChunkProjection((0,), chunk_selection, out_selection, replace)] # type: ignore[arg-type]
226+
return PreparedWrite(chunk_dict=chunk_dict, indexer=indexer)
227+
228+
async def finalize_write(
229+
self,
230+
prepared: PreparedWrite,
231+
chunk_spec: ArraySpec,
232+
byte_setter: Any,
233+
) -> None:
234+
"""Async variant of `finalize_write_sync`."""
235+
blob = self.pack_chunks(prepared.chunk_dict, chunk_spec)
236+
if blob is None:
237+
await byte_setter.delete()
238+
else:
239+
await byte_setter.set(blob)

src/zarr/core/codec_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def __post_init__(self) -> None:
118118
bb_sync.append(bb_codec)
119119
self._bb_codecs = tuple(bb_sync)
120120

121-
def decode(
121+
def decode_chunk(
122122
self,
123123
chunk_bytes: Buffer,
124124
) -> NDBuffer:
@@ -137,7 +137,7 @@ def decode(
137137

138138
return chunk_array
139139

140-
def encode(
140+
def encode_chunk(
141141
self,
142142
chunk_array: NDBuffer,
143143
) -> Buffer | None:

tests/test_sync_codec_pipeline.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ def test_encode_decode_roundtrip(
9999
chain = ChunkTransform(codecs=codecs, array_spec=spec)
100100
nd_buf = _make_nd_buffer(arr)
101101

102-
encoded = chain.encode(nd_buf)
102+
encoded = chain.encode_chunk(nd_buf)
103103
assert encoded is not None
104-
decoded = chain.decode(encoded)
104+
decoded = chain.decode_chunk(encoded)
105105
np.testing.assert_array_equal(arr, decoded.as_numpy_array())
106106

107107

@@ -142,4 +142,4 @@ def _encode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer
142142
)
143143
arr = np.arange(12, dtype="float64").reshape(3, 4)
144144
nd_buf = _make_nd_buffer(arr)
145-
assert chain.encode(nd_buf) is None
145+
assert chain.encode_chunk(nd_buf) is None

0 commit comments

Comments
 (0)