forked from zarr-developers/zarr-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgzip.py
More file actions
89 lines (68 loc) · 2.91 KB
/
gzip.py
File metadata and controls
89 lines (68 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING
from numcodecs.gzip import GZip
from zarr.abc.codec import BytesBytesCodec
from zarr.core.buffer.cpu import as_numpy_array_wrapper
from zarr.core.common import JSON, parse_named_configuration
if TYPE_CHECKING:
from typing import Self
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer
def parse_gzip_level(data: JSON) -> int:
if not isinstance(data, (int)):
raise TypeError(f"Expected int, got {type(data)}")
if data not in range(10):
raise ValueError(
f"Expected an integer from the inclusive range (0, 9). Got {data} instead."
)
return data
@dataclass(frozen=True)
class GzipCodec(BytesBytesCodec):
"""gzip codec"""
is_fixed_size = False
level: int = 5
def __init__(self, *, level: int = 5) -> None:
level_parsed = parse_gzip_level(level)
object.__setattr__(self, "level", level_parsed)
@classmethod
def from_dict(cls, data: dict[str, JSON]) -> Self:
_, configuration_parsed = parse_named_configuration(data, "gzip")
return cls(**configuration_parsed) # type: ignore[arg-type]
def to_dict(self) -> dict[str, JSON]:
return {"name": "gzip", "configuration": {"level": self.level}}
# Cache the numcodecs GZip instance. GzipCodec is a frozen dataclass,
# so `level` never changes after construction, making this safe.
# This matches the pattern used by ZstdCodec._zstd_codec and
# BloscCodec._blosc_codec. Without caching, a new GZip(level) was
# created on every encode/decode call.
@cached_property
def _gzip_codec(self) -> GZip:
return GZip(self.level)
def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer:
# Use the cached codec instance instead of creating GZip(self.level)
# each time. The async _decode_single delegates to this method via
# asyncio.to_thread, so both paths benefit from the cache.
return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype)
def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None:
return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype)
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)
async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)
def compute_encoded_size(
self,
_input_byte_length: int,
_chunk_spec: ArraySpec,
) -> int:
raise NotImplementedError