Skip to content

Commit b67a5a0

Browse files
d-v-bclaude
andcommitted
test: parametrize codec pipeline tests over BatchedCodecPipeline and PhasedCodecPipeline
All pipeline contract tests now run against both implementations: roundtrip, sharded roundtrip, partial writes, missing chunks, strided reads, multidimensional, and compression. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent f66a19c commit b67a5a0

File tree

1 file changed

+261
-6
lines changed

1 file changed

+261
-6
lines changed

tests/test_codec_pipeline.py

Lines changed: 261 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,55 @@
11
from __future__ import annotations
22

3+
from typing import TYPE_CHECKING, Any
4+
5+
import numpy as np
36
import pytest
47

58
import zarr
69
from zarr.core.array import _get_chunk_spec
710
from zarr.core.buffer.core import default_buffer_prototype
11+
from zarr.core.config import config as zarr_config
812
from zarr.core.indexing import BasicIndexer
13+
from zarr.errors import ChunkNotFoundError
914
from zarr.storage import MemoryStore
1015

16+
if TYPE_CHECKING:
17+
from collections.abc import Generator
18+
19+
pipeline_paths = [
20+
"zarr.core.codec_pipeline.BatchedCodecPipeline",
21+
"zarr.core.codec_pipeline.PhasedCodecPipeline",
22+
]
23+
24+
25+
@pytest.fixture(params=pipeline_paths, ids=["batched", "phased"])
26+
def pipeline_class(request: pytest.FixtureRequest) -> Generator[str]:
27+
"""Temporarily set the codec pipeline class for the test."""
28+
path = request.param
29+
with zarr_config.set({"codec_pipeline.path": path}):
30+
yield path
31+
32+
33+
# ---------------------------------------------------------------------------
34+
# GetResult status tests (low-level pipeline API)
35+
# ---------------------------------------------------------------------------
36+
1137

1238
@pytest.mark.parametrize(
1339
("write_slice", "read_slice", "expected_statuses"),
1440
[
15-
# Write all chunks, read all — all present
1641
(slice(None), slice(None), ("present", "present", "present")),
17-
# Write first chunk only, read all — first present, rest missing
1842
(slice(0, 2), slice(None), ("present", "missing", "missing")),
19-
# Write nothing, read all — all missing
2043
(None, slice(None), ("missing", "missing", "missing")),
2144
],
2245
)
2346
async def test_read_returns_get_results(
47+
pipeline_class: str,
2448
write_slice: slice | None,
2549
read_slice: slice,
2650
expected_statuses: tuple[str, ...],
2751
) -> None:
28-
"""
29-
Test that CodecPipeline.read returns a tuple of GetResult with correct statuses.
30-
"""
52+
"""CodecPipeline.read returns GetResult with correct statuses."""
3153
store = MemoryStore()
3254
arr = zarr.open_array(store, mode="w", shape=(6,), chunks=(2,), dtype="int64", fill_value=-1)
3355

@@ -70,3 +92,236 @@ async def test_read_returns_get_results(
7092
assert len(results) == len(expected_statuses)
7193
for result, expected_status in zip(results, expected_statuses, strict=True):
7294
assert result["status"] == expected_status
95+
96+
97+
# ---------------------------------------------------------------------------
98+
# End-to-end read/write tests
99+
# ---------------------------------------------------------------------------
100+
101+
array_configs = [
102+
pytest.param(
103+
{"shape": (100,), "dtype": "float64", "chunks": (10,), "shards": None, "compressors": None},
104+
id="1d-unsharded",
105+
),
106+
pytest.param(
107+
{
108+
"shape": (100,),
109+
"dtype": "float64",
110+
"chunks": (10,),
111+
"shards": (100,),
112+
"compressors": None,
113+
},
114+
id="1d-sharded",
115+
),
116+
pytest.param(
117+
{
118+
"shape": (10, 20),
119+
"dtype": "int32",
120+
"chunks": (5, 10),
121+
"shards": None,
122+
"compressors": None,
123+
},
124+
id="2d-unsharded",
125+
),
126+
pytest.param(
127+
{
128+
"shape": (100,),
129+
"dtype": "float64",
130+
"chunks": (10,),
131+
"shards": None,
132+
"compressors": {"name": "gzip", "configuration": {"level": 1}},
133+
},
134+
id="1d-gzip",
135+
),
136+
]
137+
138+
139+
@pytest.mark.parametrize("arr_kwargs", array_configs)
140+
async def test_roundtrip(pipeline_class: str, arr_kwargs: dict[str, Any]) -> None:
141+
"""Data survives a full write/read roundtrip."""
142+
store = MemoryStore()
143+
arr = zarr.create_array(store=store, fill_value=0, **arr_kwargs)
144+
data = np.arange(int(np.prod(arr.shape)), dtype=arr.dtype).reshape(arr.shape)
145+
arr[:] = data
146+
np.testing.assert_array_equal(arr[:], data)
147+
148+
149+
@pytest.mark.parametrize("arr_kwargs", array_configs)
150+
async def test_missing_chunks_fill_value(pipeline_class: str, arr_kwargs: dict[str, Any]) -> None:
151+
"""Reading unwritten chunks returns the fill value."""
152+
store = MemoryStore()
153+
fill = -1
154+
arr = zarr.create_array(store=store, fill_value=fill, **arr_kwargs)
155+
expected = np.full(arr.shape, fill, dtype=arr.dtype)
156+
np.testing.assert_array_equal(arr[:], expected)
157+
158+
159+
write_then_read_cases = [
160+
pytest.param(
161+
slice(None),
162+
np.s_[:],
163+
id="full-write-full-read",
164+
),
165+
pytest.param(
166+
slice(5, 15),
167+
np.s_[:],
168+
id="partial-write-full-read",
169+
),
170+
pytest.param(
171+
slice(None),
172+
np.s_[::3],
173+
id="full-write-strided-read",
174+
),
175+
pytest.param(
176+
slice(None),
177+
np.s_[10:20],
178+
id="full-write-slice-read",
179+
),
180+
]
181+
182+
183+
@pytest.mark.parametrize(
184+
"arr_kwargs",
185+
[
186+
pytest.param(
187+
{
188+
"shape": (100,),
189+
"dtype": "float64",
190+
"chunks": (10,),
191+
"shards": None,
192+
"compressors": None,
193+
},
194+
id="unsharded",
195+
),
196+
pytest.param(
197+
{
198+
"shape": (100,),
199+
"dtype": "float64",
200+
"chunks": (10,),
201+
"shards": (100,),
202+
"compressors": None,
203+
},
204+
id="sharded",
205+
),
206+
],
207+
)
208+
@pytest.mark.parametrize(("write_sel", "read_sel"), write_then_read_cases)
209+
async def test_write_then_read(
210+
pipeline_class: str,
211+
arr_kwargs: dict[str, Any],
212+
write_sel: slice,
213+
read_sel: slice,
214+
) -> None:
215+
"""Various write + read selection combinations produce correct results."""
216+
store = MemoryStore()
217+
arr = zarr.create_array(store=store, fill_value=0.0, **arr_kwargs)
218+
full = np.zeros(arr.shape, dtype=arr.dtype)
219+
220+
write_data = np.arange(len(full[write_sel]), dtype=arr.dtype) + 1
221+
full[write_sel] = write_data
222+
arr[write_sel] = write_data
223+
224+
np.testing.assert_array_equal(arr[read_sel], full[read_sel])
225+
226+
227+
# ---------------------------------------------------------------------------
228+
# write_empty_chunks / read_missing_chunks config tests
229+
# ---------------------------------------------------------------------------
230+
231+
232+
@pytest.mark.parametrize(
233+
"arr_kwargs",
234+
[
235+
pytest.param(
236+
{
237+
"shape": (20,),
238+
"dtype": "float64",
239+
"chunks": (10,),
240+
"shards": None,
241+
"compressors": None,
242+
},
243+
id="unsharded",
244+
),
245+
pytest.param(
246+
{
247+
"shape": (20,),
248+
"dtype": "float64",
249+
"chunks": (10,),
250+
"shards": (20,),
251+
"compressors": None,
252+
},
253+
id="sharded",
254+
),
255+
],
256+
)
257+
async def test_write_empty_chunks_false(pipeline_class: str, arr_kwargs: dict[str, Any]) -> None:
258+
"""With write_empty_chunks=False, writing fill_value should not persist the chunk."""
259+
store = MemoryStore()
260+
arr = zarr.create_array(
261+
store=store,
262+
fill_value=0.0,
263+
config={"write_empty_chunks": False},
264+
**arr_kwargs,
265+
)
266+
# Write non-fill to first chunk, fill_value to second chunk
267+
arr[0:10] = np.arange(10, dtype="float64") + 1
268+
arr[10:20] = np.zeros(10, dtype="float64") # all fill_value
269+
270+
# Read back — both chunks should return correct data
271+
np.testing.assert_array_equal(arr[0:10], np.arange(10, dtype="float64") + 1)
272+
np.testing.assert_array_equal(arr[10:20], np.zeros(10, dtype="float64"))
273+
274+
275+
async def test_write_empty_chunks_true(pipeline_class: str) -> None:
276+
"""With write_empty_chunks=True, fill_value chunks should still be stored."""
277+
with zarr_config.set({"array.write_empty_chunks": True, "array.read_missing_chunks": False}):
278+
store = MemoryStore()
279+
arr = zarr.create_array(
280+
store=store,
281+
shape=(20,),
282+
dtype="float64",
283+
chunks=(10,),
284+
shards=None,
285+
compressors=None,
286+
fill_value=0.0,
287+
)
288+
arr[:] = 0.0 # all fill_value
289+
290+
# Chunks should exist even though they're fill — reading with
291+
# read_missing_chunks=False should NOT raise
292+
result = arr[:]
293+
np.testing.assert_array_equal(result, np.zeros(20, dtype="float64"))
294+
295+
296+
async def test_read_missing_chunks_false_raises(pipeline_class: str) -> None:
297+
"""With read_missing_chunks=False, reading a missing chunk should raise."""
298+
with zarr_config.set({"array.read_missing_chunks": False}):
299+
store = MemoryStore()
300+
arr = zarr.create_array(
301+
store=store,
302+
shape=(20,),
303+
dtype="float64",
304+
chunks=(10,),
305+
shards=None,
306+
compressors=None,
307+
fill_value=0.0,
308+
)
309+
# Don't write anything — all chunks are missing
310+
with pytest.raises(ChunkNotFoundError):
311+
arr[:]
312+
313+
314+
async def test_read_missing_chunks_true_fills(pipeline_class: str) -> None:
315+
"""With read_missing_chunks=True (default), missing chunks return fill_value."""
316+
store = MemoryStore()
317+
arr = zarr.create_array(
318+
store=store,
319+
shape=(20,),
320+
dtype="float64",
321+
chunks=(10,),
322+
shards=None,
323+
compressors=None,
324+
fill_value=-999.0,
325+
)
326+
# Don't write anything
327+
np.testing.assert_array_equal(arr[:], np.full(20, -999.0))

0 commit comments

Comments
 (0)