Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ebd28a5
Initial version of CTable dictionary/categorical column type
FrancescAlted May 11, 2026
f0996fe
Add missing module
FrancescAlted May 11, 2026
3bdeb93
Add nested dotted column access and where-expression support in CTable
FrancescAlted May 11, 2026
ebe31e8
Add dotted nested column support and hierarchical _cols storage paths
FrancescAlted May 11, 2026
1311156
Add nested schema v2 metadata and preserve empty-root Arrow names thr…
FrancescAlted May 11, 2026
3e6458b
Added logical nested selector plumbing so metadata mapping is actuall…
FrancescAlted May 11, 2026
a695c42
Implement nested-field CTable pipeline with flattened storage, logica…
FrancescAlted May 11, 2026
a139b49
Skip tests when PyArrow is not installed
FrancescAlted May 11, 2026
b8188dd
Implemented append/extend with nested dict
FrancescAlted May 12, 2026
0e0a970
Add docs for new nested field feature
FrancescAlted May 12, 2026
693ec32
Implemented ield-name escaping for literal `.` and `/`
FrancescAlted May 12, 2026
b9de583
Add updated plan
FrancescAlted May 12, 2026
864dc36
OPT: validate only when importing Arrow's Python datetime values
FrancescAlted May 12, 2026
3681622
New list_serializer param and parquet import optimizations
FrancescAlted May 12, 2026
4c3b889
Fix test
FrancescAlted May 12, 2026
5b40792
CTable separate nested columns for list-struct data (see plan)
FrancescAlted May 13, 2026
d576708
New max_rows param for CTable.from_parquet()
FrancescAlted May 13, 2026
301614a
separate_nested_cols changed to True in CTable.from_parquet() (and CLI)
FrancescAlted May 13, 2026
e464416
Add --progress to parquet-to-blosc2; print progress every N batches
FrancescAlted May 13, 2026
19ceef9
Make --separate-nested-cols in parquet-to-blosc2 to always separate c…
FrancescAlted May 13, 2026
248f323
Add ETA to progress line indicator
FrancescAlted May 13, 2026
782f0e3
Improve parquet-to-blosc2 nested import batching and progress
FrancescAlted May 13, 2026
dd3331e
Parquet imports default now to Arrow serializer
FrancescAlted May 13, 2026
0166723
Output beautification
FrancescAlted May 13, 2026
6e7a31e
Updated plan
FrancescAlted May 13, 2026
cd48deb
Clarifying that is not a goal that blosc2 and parquet should be total…
FrancescAlted May 13, 2026
5b70933
Make CTable.nrows lazy for speeding up opening files
FrancescAlted May 13, 2026
66f92d7
Assume valid extensions for speeding up opening stores
FrancescAlted May 13, 2026
0a70db1
Allow for lazy metainfo loading of columns for fast open()
FrancescAlted May 13, 2026
83fc6c3
Optimizations for importing blosc2 package
FrancescAlted May 14, 2026
1c88cd6
Merge branch 'main' into ctable-dict-spec
FrancescAlted May 14, 2026
de8007c
Trimmed default parametrization matrices and mark broader combination…
FrancescAlted May 14, 2026
d490485
Merge branch 'ctable-dict-spec' of github.com:Blosc/python-blosc2 int…
FrancescAlted May 14, 2026
3a0714f
Mark torch-dependent tests as heavy
FrancescAlted May 14, 2026
7443200
Allow for nested columns in where conditions
FrancescAlted May 14, 2026
93337b2
New Column.info, and ability to include dict types in where clauses
FrancescAlted May 14, 2026
67c0b64
Extrating data from BachArray is expensive; use placeholders instead
FrancescAlted May 14, 2026
8ca7a80
Do not convert to python when displaying numpy data
FrancescAlted May 14, 2026
60925e1
Accelerated the batch/list scalar/small-slice access path; no placeho…
FrancescAlted May 14, 2026
1a0d6c5
Adapt the printing of a table to the geometry of the terminal
FrancescAlted May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions bench/ctable/bench_nested_parquet_roundtrip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python

from __future__ import annotations

import argparse
import os
import shutil
import tempfile
import time
from pathlib import Path

import pyarrow as pa
import pyarrow.parquet as pq

import blosc2


def _dir_size(path: Path) -> int:
total = 0
for root, _, files in os.walk(path):
for f in files:
total += (Path(root) / f).stat().st_size
return total


def main() -> None:
p = argparse.ArgumentParser(description="Benchmark CTable nested Parquet roundtrip")
p.add_argument("parquet", help="Input Parquet file")
p.add_argument("--rows", type=int, default=0, help="Sample first N rows (0 = full file)")
p.add_argument("--keep", action="store_true", help="Keep temporary outputs")
args = p.parse_args()

src = Path(args.parquet)
if not src.exists():
raise FileNotFoundError(src)

workdir = Path(tempfile.mkdtemp(prefix="b2-nested-bench-"))
sample_path = workdir / "sample.parquet"
out_b2d = workdir / "out.b2d"
out_parquet = workdir / "out.parquet"

try:
input_path = src
if args.rows > 0:
pf = pq.ParquetFile(src)
batch = next(pf.iter_batches(batch_size=args.rows))
table = pa.Table.from_batches([batch], schema=pf.schema_arrow)
pq.write_table(table, sample_path)
input_path = sample_path

t0 = time.perf_counter()
t = blosc2.CTable.from_parquet(str(input_path))
t1 = time.perf_counter()

t.save(str(out_b2d), overwrite=True)
t2 = time.perf_counter()

t.to_parquet(str(out_parquet))
t3 = time.perf_counter()

print("=== CTable nested Parquet roundtrip benchmark ===")
print(f"input: {input_path}")
print(f"rows: {t.nrows}")
print(f"columns: {len(t.col_names)}")
print(f"from_parquet (s): {t1 - t0:.3f}")
print(f"save b2d (s): {t2 - t1:.3f}")
print(f"to_parquet (s): {t3 - t2:.3f}")
print(f"input bytes: {input_path.stat().st_size}")
print(f"output parquet: {out_parquet.stat().st_size}")
print(f"output b2d bytes: {_dir_size(out_b2d)}")
print(f"workdir: {workdir}")

if not args.keep:
shutil.rmtree(workdir)
except Exception:
if not args.keep:
shutil.rmtree(workdir, ignore_errors=True)
raise


if __name__ == "__main__":
main()
118 changes: 118 additions & 0 deletions bench/ctable/where-nulls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
#######################################################################

"""Create a persistent nullable CTable for where() benchmarks.

Usage:
python bench/ctable/where-nulls.py table.b2d
python bench/ctable/where-nulls.py table.b2z
"""

from __future__ import annotations

import argparse
from dataclasses import dataclass
from pathlib import Path
from time import perf_counter

import numpy as np

import blosc2

NROWS = 500_000_000
NULL_VALUE = 500
RNG_SEED = 42


@dataclass
class Row:
nrow: int = blosc2.field(blosc2.int64(ge=0))
col1: int = blosc2.field(blosc2.int64(ge=0, le=1000, null_value=NULL_VALUE), default=None)
col2: int = blosc2.field(blosc2.int64(ge=0, le=1000, null_value=NULL_VALUE), default=None)


DTYPE = np.dtype(
[
("nrow", np.int64),
("col1", np.int64),
("col2", np.int64),
]
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"urlpath",
help="Output table path. Use a .b2d directory or a .b2z file extension.",
)
return parser.parse_args()


def check_urlpath(urlpath: str) -> str:
suffix = Path(urlpath).suffix
if suffix not in {".b2d", ".b2z"}:
raise SystemExit("urlpath must end in .b2d (directory-backed) or .b2z (zip-backed)")
return suffix[1:]


def make_nullable_column(rng: np.random.Generator) -> np.ndarray:
# Normal distribution centered at 500, with practically all values in [0, 1000].
return np.rint(rng.normal(loc=500, scale=50, size=NROWS)).clip(0, 1000).astype(np.int64)


def make_data() -> np.ndarray:
rng = np.random.default_rng(RNG_SEED)
data = np.empty(NROWS, dtype=DTYPE)
data["nrow"] = np.arange(NROWS, dtype=np.int64)
data["col1"] = make_nullable_column(rng)
data["col2"] = make_nullable_column(rng)
return data


def fmt_bytes(nbytes: int) -> str:
for unit in ("B", "KiB", "MiB", "GiB"):
if abs(nbytes) < 1024 or unit == "GiB":
return f"{nbytes:.2f} {unit}" if unit != "B" else f"{nbytes} {unit}"
nbytes /= 1024
return f"{nbytes:.2f} GiB"


def main() -> None:
args = parse_args()
format_name = check_urlpath(args.urlpath)

t0 = perf_counter()
data = make_data()
nulls_col1 = int(np.count_nonzero(data["col1"] == NULL_VALUE))
nulls_col2 = int(np.count_nonzero(data["col2"] == NULL_VALUE))

table = blosc2.CTable(Row, urlpath=args.urlpath, mode="w", expected_size=NROWS, validate=False)
table.extend(data, validate=False)
elapsed = perf_counter() - t0

print("CTable nullable where() benchmark data created")
print("=" * 52)
print(f"urlpath: {args.urlpath}")
print(f"format: {format_name}")
print(f"rows: {len(table):,}")
print(f"columns: {', '.join(table.col_names)}")
print(f"null sentinel: {NULL_VALUE}")
print(f"col1 nulls: {nulls_col1:,}")
print(f"col2 nulls: {nulls_col2:,}")
print(f"uncompressed: {fmt_bytes(table.nbytes)}")
print(f"compressed: {fmt_bytes(table.cbytes)}")
print(f"compression: {table.cratio:.2f}x")
print(f"creation time: {elapsed:.3f} s")
print()
print(table)

table.close()


if __name__ == "__main__":
main()
137 changes: 137 additions & 0 deletions bench/large-dict-store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
# All rights reserved.
#
# This source code is licensed under a BSD-style license (found in the
# LICENSE file in the root directory of this source tree)
#######################################################################
import os
import time
import numpy as np
import blosc2
from blosc2 import DictStore
from memory_profiler import memory_usage

def make_arrays(n, min_size, max_size, dtype="f8"):
sizes = np.linspace(min_size, max_size, n).astype(int)
#arrays = [blosc2.arange(size, dtype=dtype) for size in sizes]
arrays = [blosc2.linspace(0, 1, size, dtype=dtype) for size in sizes]
#arrays = [np.random.randint(0, 100, size=size, dtype=dtype) for size in sizes]
# Calculate uncompressed size
uncompressed_size = sum(arr.nbytes for arr in arrays)
print(f"Uncompressed data size: {uncompressed_size / 1e9:.2f} GB")
return arrays, sizes, uncompressed_size

def get_file_size(filepath):
"""Get file size in MB."""
if os.path.exists(filepath):
return os.path.getsize(filepath) / 2**20
return 0

def check_arrays(tree_path, arrays, prefix="node"):
print("Checking stored arrays...")
tree = DictStore(tree_path, mode="r")
for i, arr in enumerate(arrays):
stored_arr = tree[f"/{prefix}{i}"][:]
if not np.allclose(arr, stored_arr):
raise ValueError(f"Array mismatch at {prefix}{i}")

def run_embed_tree(arrays, threshold, tree_path, uncompressed_size, check=False):
def embed_process():
tree = DictStore(tree_path, mode="w", threshold=threshold)
for i, arr in enumerate(arrays):
tree[f"/node{i}"] = arr
tree.close()

t0 = time.time()
mem_usage = memory_usage((embed_process, ()), interval=0.1)
t1 = time.time()
peak_mem = max(mem_usage) - min(mem_usage)
file_size = get_file_size(tree_path)
compression_ratio = uncompressed_size / (file_size * 2**20) if file_size > 0 else 0
print(f"[Embed] Time: {t1-t0:.2f}s, Memory: {peak_mem:.2f} MB, File size: {file_size:.2f} MB,"
f" Compression: {compression_ratio:.1f}x")

if check:
check_arrays(tree_path, arrays, prefix="node")

return t1-t0, peak_mem, file_size

def run_external_tree(arrays, threshold, tree_path, arr_prefix, uncompressed_size, check=False):
def external_process():
tree = DictStore(tree_path, mode="w", threshold=threshold)
for i, arr in enumerate(arrays):
arr_path = f"{arr_prefix}_node{i}.b2nd"
arr_b2 = blosc2.asarray(arr, urlpath=arr_path, mode="w")
tree[f"/node{i}"] = arr_b2
tree.close()

t0 = time.time()
mem_usage = memory_usage((external_process, ()), interval=0.1)
t1 = time.time()
peak_mem = max(mem_usage) - min(mem_usage)
file_size = get_file_size(tree_path)
total_external_size = sum(get_file_size(f"{arr_prefix}_node{i}.b2nd") for i in range(len(arrays)))
total_size_mb = (file_size + total_external_size)
compression_ratio = uncompressed_size / (total_size_mb * 2**20) if total_size_mb > 0 else 0
print(f"[External] Time: {t1-t0:.2f}s, Memory: {peak_mem:.2f} MB, DictStore file size: {file_size:.2f} MB,"
f" External files size: {total_external_size:.2f} MB, Total: {total_size_mb:.2f} MB,"
f" Compression: {compression_ratio:.1f}x")

if check:
check_arrays(tree_path, arrays, prefix="node")

return t1-t0, peak_mem, file_size, total_external_size

def cleanup_files(tree_path, arr_prefix, n):
if os.path.exists(tree_path):
os.remove(tree_path)
for i in range(n):
arr_path = f"{arr_prefix}_node{i}.b2nd"
if os.path.exists(arr_path):
os.remove(arr_path)

if __name__ == "__main__":
N = 10
min_size = int(1e6) # 1 MB
max_size = int(1e8) # 100 MB
threshold = 2**23 # 8 MB threshold before using external arrays
print(f"Creating {N} arrays with sizes ranging from {min_size / 1e6:.2f} to {max_size / 1e6:.2f} MB...")
arrays, sizes, uncompressed_size = make_arrays(N, min_size, max_size)

print("Benchmarking DictStore with embed arrays...")
tree_path_embed = "large_dict_store_embed.b2z"
t_embed, mem_embed, file_size_embed = run_embed_tree(arrays, None, tree_path_embed, uncompressed_size)

print("Benchmarking DictStore with external arrays with threshold...")
tree_path_external = "large_dict_store_external_threshold.b2z"
arr_prefix = "large_external"
t_t_external, mem_t_external, file_t_size_external, external_t_size = (
run_external_tree(arrays, threshold, tree_path_external, arr_prefix, uncompressed_size))

print("Benchmarking DictStore with external arrays with no threshold...")
tree_path_external_noth = "large_dict_store_external_nothreshold.b2z"
arr_prefix = "large_external_noth"
t_external, mem_external, file_size_external, external_size = (
run_external_tree(arrays, None, tree_path_external_noth, arr_prefix, uncompressed_size))

print("\nSummary:")
print(f"Embed arrays: Time = {t_embed:.2f}s, Memory = {mem_embed:.2f} MB,"
f" File size = {file_size_embed:.2f} MB")
print(f"External arrays (th: {threshold / 2**20:.2f} MB): Time = {t_t_external:.2f}s, Memory = {mem_t_external:.2f} MB,"
f" DictStore file size = {file_t_size_external:.2f} MB, External files size = {external_t_size:.2f} MB")
print(f"External arrays: Time = {t_external:.2f}s, Memory = {mem_external:.2f} MB,"
f" DictStore file size = {file_size_external:.2f} MB, External files size = {external_size:.2f} MB")

speedup = t_embed / t_external if t_external > 0 else float('inf')
mem_ratio = mem_embed / mem_external if mem_external > 0 else float('inf')
file_ratio = file_size_embed / file_size_external if file_size_external > 0 else float('inf')
storage_ratio = file_size_embed / (file_size_external)
print(f"Time ratio (embed/external): {speedup:.2f}x")
print(f"Memory ratio (embed/external): {mem_ratio:.2f}x")
print(f"File size ratio (embed/external tree): {file_ratio:.2f}x")
print(f"Storage efficiency (embed vs total external): {storage_ratio:.2f}x")

# cleanup_files(tree_path_embed, arr_prefix, N)
# cleanup_files(tree_path_external, arr_prefix, N)
# cleanup_files(tree_path_external_noth, arr_prefix_noth, N)
Loading
Loading