Skip to content

Commit cd4efb0

Browse files
d-v-bclaude
andcommitted
add CodecChain dataclass and sync codec tests
Introduces CodecChain, a frozen dataclass that chains array-array, array-bytes, and bytes-bytes codecs with synchronous encode/decode methods. Pure compute only -- no IO, no threading, no batching. Also adds sync roundtrip tests for individual codecs (blosc, gzip, zstd, crc32c, bytes, transpose, vlen) and CodecChain integration tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 2b64daa commit cd4efb0

9 files changed

Lines changed: 474 additions & 5 deletions

File tree

src/zarr/core/codec_pipeline.py

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from __future__ import annotations
22

3-
from dataclasses import dataclass
3+
from dataclasses import dataclass, field
44
from itertools import islice, pairwise
5-
from typing import TYPE_CHECKING, Any, TypeVar
5+
from typing import TYPE_CHECKING, Any, TypeVar, cast
66
from warnings import warn
77

88
from zarr.abc.codec import (
@@ -13,6 +13,7 @@
1313
BytesBytesCodec,
1414
Codec,
1515
CodecPipeline,
16+
SupportsSyncCodec,
1617
)
1718
from zarr.core.common import concurrent_map
1819
from zarr.core.config import config
@@ -68,6 +69,134 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any:
6869
return fill_value
6970

7071

72+
@dataclass(frozen=True)
73+
class CodecChain:
74+
"""Lightweight codec chain: array-array -> array-bytes -> bytes-bytes.
75+
76+
Pure compute only -- no IO methods, no threading, no batching.
77+
"""
78+
79+
array_array_codecs: tuple[ArrayArrayCodec, ...]
80+
array_bytes_codec: ArrayBytesCodec
81+
bytes_bytes_codecs: tuple[BytesBytesCodec, ...]
82+
83+
_all_sync: bool = field(default=False, init=False, repr=False, compare=False)
84+
85+
def __post_init__(self) -> None:
86+
object.__setattr__(
87+
self,
88+
"_all_sync",
89+
all(isinstance(c, SupportsSyncCodec) for c in self),
90+
)
91+
92+
def __iter__(self) -> Iterator[Codec]:
93+
yield from self.array_array_codecs
94+
yield self.array_bytes_codec
95+
yield from self.bytes_bytes_codecs
96+
97+
@classmethod
98+
def from_codecs(cls, codecs: Iterable[Codec]) -> CodecChain:
99+
aa, ab, bb = codecs_from_list(list(codecs))
100+
return cls(array_array_codecs=aa, array_bytes_codec=ab, bytes_bytes_codecs=bb)
101+
102+
def resolve_metadata_chain(
103+
self, chunk_spec: ArraySpec
104+
) -> tuple[
105+
list[tuple[ArrayArrayCodec, ArraySpec]],
106+
tuple[ArrayBytesCodec, ArraySpec],
107+
list[tuple[BytesBytesCodec, ArraySpec]],
108+
]:
109+
"""Resolve metadata through the codec chain for a single chunk_spec."""
110+
aa_codecs_with_spec: list[tuple[ArrayArrayCodec, ArraySpec]] = []
111+
spec = chunk_spec
112+
for aa_codec in self.array_array_codecs:
113+
aa_codecs_with_spec.append((aa_codec, spec))
114+
spec = aa_codec.resolve_metadata(spec)
115+
116+
ab_codec_with_spec = (self.array_bytes_codec, spec)
117+
spec = self.array_bytes_codec.resolve_metadata(spec)
118+
119+
bb_codecs_with_spec: list[tuple[BytesBytesCodec, ArraySpec]] = []
120+
for bb_codec in self.bytes_bytes_codecs:
121+
bb_codecs_with_spec.append((bb_codec, spec))
122+
spec = bb_codec.resolve_metadata(spec)
123+
124+
return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec)
125+
126+
def decode_chunk(
127+
self,
128+
chunk_bytes: Buffer,
129+
chunk_spec: ArraySpec,
130+
aa_chain: Iterable[tuple[ArrayArrayCodec, ArraySpec]] | None = None,
131+
ab_pair: tuple[ArrayBytesCodec, ArraySpec] | None = None,
132+
bb_chain: Iterable[tuple[BytesBytesCodec, ArraySpec]] | None = None,
133+
) -> NDBuffer:
134+
"""Decode a single chunk through the full codec chain, synchronously.
135+
136+
Pure compute -- no IO. Only callable when all codecs support sync.
137+
138+
The optional ``aa_chain``, ``ab_pair``, ``bb_chain`` parameters allow
139+
pre-resolved metadata to be reused across many chunks with the same spec.
140+
If not provided, ``resolve_metadata_chain`` is called internally.
141+
"""
142+
if aa_chain is None or ab_pair is None or bb_chain is None:
143+
aa_chain, ab_pair, bb_chain = self.resolve_metadata_chain(chunk_spec)
144+
145+
bb_out: Any = chunk_bytes
146+
for bb_codec, spec in reversed(list(bb_chain)):
147+
bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, spec)
148+
149+
ab_codec, ab_spec = ab_pair
150+
ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec)
151+
152+
for aa_codec, spec in reversed(list(aa_chain)):
153+
ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec)
154+
155+
return ab_out # type: ignore[no-any-return]
156+
157+
def encode_chunk(
158+
self,
159+
chunk_array: NDBuffer,
160+
chunk_spec: ArraySpec,
161+
) -> Buffer | None:
162+
"""Encode a single chunk through the full codec chain, synchronously.
163+
164+
Pure compute -- no IO. Only callable when all codecs support sync.
165+
"""
166+
spec = chunk_spec
167+
aa_out: Any = chunk_array
168+
169+
for aa_codec in self.array_array_codecs:
170+
if aa_out is None:
171+
return None
172+
aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec)
173+
spec = aa_codec.resolve_metadata(spec)
174+
175+
if aa_out is None:
176+
return None
177+
bb_out: Any = cast("SupportsSyncCodec", self.array_bytes_codec)._encode_sync(aa_out, spec)
178+
spec = self.array_bytes_codec.resolve_metadata(spec)
179+
180+
for bb_codec in self.bytes_bytes_codecs:
181+
if bb_out is None:
182+
return None
183+
bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec)
184+
spec = bb_codec.resolve_metadata(spec)
185+
186+
return bb_out # type: ignore[no-any-return]
187+
188+
def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int:
189+
for codec in self:
190+
byte_length = codec.compute_encoded_size(byte_length, array_spec)
191+
array_spec = codec.resolve_metadata(array_spec)
192+
return byte_length
193+
194+
def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec:
195+
for codec in self:
196+
chunk_spec = codec.resolve_metadata(chunk_spec)
197+
return chunk_spec
198+
199+
71200
@dataclass(frozen=True)
72201
class BatchedCodecPipeline(CodecPipeline):
73202
"""Default codec pipeline.

tests/test_codecs/test_blosc.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
from packaging.version import Version
77

88
import zarr
9+
from zarr.abc.codec import SupportsSyncCodec
910
from zarr.codecs import BloscCodec
1011
from zarr.codecs.blosc import BloscShuffle, Shuffle
11-
from zarr.core.array_spec import ArraySpec
12+
from zarr.core.array_spec import ArrayConfig, ArraySpec
1213
from zarr.core.buffer import default_buffer_prototype
13-
from zarr.core.dtype import UInt16
14+
from zarr.core.dtype import UInt16, get_data_type_from_native_dtype
1415
from zarr.storage import MemoryStore, StorePath
1516

1617

@@ -110,3 +111,27 @@ async def test_typesize() -> None:
110111
else:
111112
expected_size = 10216
112113
assert size == expected_size, msg
114+
115+
116+
def test_blosc_codec_supports_sync() -> None:
117+
assert isinstance(BloscCodec(), SupportsSyncCodec)
118+
119+
120+
def test_blosc_codec_sync_roundtrip() -> None:
121+
codec = BloscCodec(typesize=8)
122+
arr = np.arange(100, dtype="float64")
123+
zdtype = get_data_type_from_native_dtype(arr.dtype)
124+
spec = ArraySpec(
125+
shape=arr.shape,
126+
dtype=zdtype,
127+
fill_value=zdtype.cast_scalar(0),
128+
config=ArrayConfig(order="C", write_empty_chunks=True),
129+
prototype=default_buffer_prototype(),
130+
)
131+
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))
132+
133+
encoded = codec._encode_sync(buf, spec)
134+
assert encoded is not None
135+
decoded = codec._decode_sync(encoded, spec)
136+
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
137+
np.testing.assert_array_equal(arr, result)

tests/test_codecs/test_crc32c.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from __future__ import annotations
2+
3+
import numpy as np
4+
5+
from zarr.abc.codec import SupportsSyncCodec
6+
from zarr.codecs.crc32c_ import Crc32cCodec
7+
from zarr.core.array_spec import ArrayConfig, ArraySpec
8+
from zarr.core.buffer import default_buffer_prototype
9+
from zarr.core.dtype import get_data_type_from_native_dtype
10+
11+
12+
def test_crc32c_codec_supports_sync() -> None:
13+
assert isinstance(Crc32cCodec(), SupportsSyncCodec)
14+
15+
16+
def test_crc32c_codec_sync_roundtrip() -> None:
17+
codec = Crc32cCodec()
18+
arr = np.arange(100, dtype="float64")
19+
zdtype = get_data_type_from_native_dtype(arr.dtype)
20+
spec = ArraySpec(
21+
shape=arr.shape,
22+
dtype=zdtype,
23+
fill_value=zdtype.cast_scalar(0),
24+
config=ArrayConfig(order="C", write_empty_chunks=True),
25+
prototype=default_buffer_prototype(),
26+
)
27+
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))
28+
29+
encoded = codec._encode_sync(buf, spec)
30+
assert encoded is not None
31+
decoded = codec._decode_sync(encoded, spec)
32+
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
33+
np.testing.assert_array_equal(arr, result)

tests/test_codecs/test_endian.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import pytest
55

66
import zarr
7+
from zarr.abc.codec import SupportsSyncCodec
78
from zarr.abc.store import Store
89
from zarr.codecs import BytesCodec
10+
from zarr.core.array_spec import ArrayConfig, ArraySpec
11+
from zarr.core.buffer import NDBuffer, default_buffer_prototype
12+
from zarr.core.dtype import get_data_type_from_native_dtype
913
from zarr.storage import StorePath
1014

1115
from .test_codecs import _AsyncArrayProxy
@@ -33,6 +37,31 @@ async def test_endian(store: Store, endian: Literal["big", "little"]) -> None:
3337
assert np.array_equal(data, readback_data)
3438

3539

40+
def test_bytes_codec_supports_sync() -> None:
41+
assert isinstance(BytesCodec(), SupportsSyncCodec)
42+
43+
44+
def test_bytes_codec_sync_roundtrip() -> None:
45+
codec = BytesCodec()
46+
arr = np.arange(100, dtype="float64")
47+
zdtype = get_data_type_from_native_dtype(arr.dtype)
48+
spec = ArraySpec(
49+
shape=arr.shape,
50+
dtype=zdtype,
51+
fill_value=zdtype.cast_scalar(0),
52+
config=ArrayConfig(order="C", write_empty_chunks=True),
53+
prototype=default_buffer_prototype(),
54+
)
55+
nd_buf: NDBuffer = default_buffer_prototype().nd_buffer.from_numpy_array(arr)
56+
57+
codec = codec.evolve_from_array_spec(spec)
58+
59+
encoded = codec._encode_sync(nd_buf, spec)
60+
assert encoded is not None
61+
decoded = codec._decode_sync(encoded, spec)
62+
np.testing.assert_array_equal(arr, decoded.as_numpy_array())
63+
64+
3665
@pytest.mark.filterwarnings("ignore:The endianness of the requested serializer")
3766
@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"])
3867
@pytest.mark.parametrize("dtype_input_endian", [">u2", "<u2"])

tests/test_codecs/test_gzip.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,12 @@
22
import pytest
33

44
import zarr
5+
from zarr.abc.codec import SupportsSyncCodec
56
from zarr.abc.store import Store
67
from zarr.codecs import GzipCodec
8+
from zarr.core.array_spec import ArrayConfig, ArraySpec
9+
from zarr.core.buffer import default_buffer_prototype
10+
from zarr.core.dtype import get_data_type_from_native_dtype
711
from zarr.storage import StorePath
812

913

@@ -22,3 +26,27 @@ def test_gzip(store: Store) -> None:
2226

2327
a[:, :] = data
2428
assert np.array_equal(data, a[:, :])
29+
30+
31+
def test_gzip_codec_supports_sync() -> None:
32+
assert isinstance(GzipCodec(), SupportsSyncCodec)
33+
34+
35+
def test_gzip_codec_sync_roundtrip() -> None:
36+
codec = GzipCodec(level=1)
37+
arr = np.arange(100, dtype="float64")
38+
zdtype = get_data_type_from_native_dtype(arr.dtype)
39+
spec = ArraySpec(
40+
shape=arr.shape,
41+
dtype=zdtype,
42+
fill_value=zdtype.cast_scalar(0),
43+
config=ArrayConfig(order="C", write_empty_chunks=True),
44+
prototype=default_buffer_prototype(),
45+
)
46+
buf = default_buffer_prototype().buffer.from_array_like(arr.view("B"))
47+
48+
encoded = codec._encode_sync(buf, spec)
49+
assert encoded is not None
50+
decoded = codec._decode_sync(encoded, spec)
51+
result = np.frombuffer(decoded.as_numpy_array(), dtype="float64")
52+
np.testing.assert_array_equal(arr, result)

tests/test_codecs/test_transpose.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33

44
import zarr
55
from zarr import AsyncArray, config
6+
from zarr.abc.codec import SupportsSyncCodec
67
from zarr.abc.store import Store
78
from zarr.codecs import TransposeCodec
9+
from zarr.core.array_spec import ArrayConfig, ArraySpec
10+
from zarr.core.buffer import NDBuffer, default_buffer_prototype
811
from zarr.core.common import MemoryOrder
12+
from zarr.core.dtype import get_data_type_from_native_dtype
913
from zarr.storage import StorePath
1014

1115
from .test_codecs import _AsyncArrayProxy
@@ -93,3 +97,27 @@ def test_transpose_invalid(
9397
chunk_key_encoding={"name": "v2", "separator": "."},
9498
filters=[TransposeCodec(order=order)], # type: ignore[arg-type]
9599
)
100+
101+
102+
def test_transpose_codec_supports_sync() -> None:
103+
assert isinstance(TransposeCodec(order=(0, 1)), SupportsSyncCodec)
104+
105+
106+
def test_transpose_codec_sync_roundtrip() -> None:
107+
codec = TransposeCodec(order=(1, 0))
108+
arr = np.arange(12, dtype="float64").reshape(3, 4)
109+
zdtype = get_data_type_from_native_dtype(arr.dtype)
110+
spec = ArraySpec(
111+
shape=arr.shape,
112+
dtype=zdtype,
113+
fill_value=zdtype.cast_scalar(0),
114+
config=ArrayConfig(order="C", write_empty_chunks=True),
115+
prototype=default_buffer_prototype(),
116+
)
117+
nd_buf: NDBuffer = default_buffer_prototype().nd_buffer.from_numpy_array(arr)
118+
119+
encoded = codec._encode_sync(nd_buf, spec)
120+
assert encoded is not None
121+
resolved_spec = codec.resolve_metadata(spec)
122+
decoded = codec._decode_sync(encoded, resolved_spec)
123+
np.testing.assert_array_equal(arr, decoded.as_numpy_array())

tests/test_codecs/test_vlen.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55

66
import zarr
77
from zarr import Array
8-
from zarr.abc.codec import Codec
8+
from zarr.abc.codec import Codec, SupportsSyncCodec
99
from zarr.abc.store import Store
1010
from zarr.codecs import ZstdCodec
11+
from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec
1112
from zarr.core.dtype import get_data_type_from_native_dtype
1213
from zarr.core.dtype.npy.string import _NUMPY_SUPPORTS_VLEN_STRING
1314
from zarr.core.metadata.v3 import ArrayV3Metadata
@@ -62,3 +63,11 @@ def test_vlen_string(
6263
assert np.array_equal(data, b[:, :])
6364
assert b.metadata.data_type == get_data_type_from_native_dtype(data.dtype)
6465
assert a.dtype == data.dtype
66+
67+
68+
def test_vlen_utf8_codec_supports_sync() -> None:
69+
assert isinstance(VLenUTF8Codec(), SupportsSyncCodec)
70+
71+
72+
def test_vlen_bytes_codec_supports_sync() -> None:
73+
assert isinstance(VLenBytesCodec(), SupportsSyncCodec)

0 commit comments

Comments
 (0)