Skip to content

Commit 4f1e2f4

Browse files
committed
support VARIANT for pypaimon
1 parent 82c3d83 commit 4f1e2f4

12 files changed

Lines changed: 3894 additions & 34 deletions

File tree

docs/content/pypaimon/python-api.md

Lines changed: 202 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -687,22 +687,208 @@ Row kind values:
687687

688688
## Data Types
689689

690-
| Python Native Type | PyArrow Type | Paimon Type |
691-
|:--------------------|:-------------------------------------------------|:----------------------------------|
692-
| `int` | `pyarrow.int8()` | `TINYINT` |
693-
| `int` | `pyarrow.int16()` | `SMALLINT` |
694-
| `int` | `pyarrow.int32()` | `INT` |
695-
| `int` | `pyarrow.int64()` | `BIGINT` |
696-
| `float` | `pyarrow.float32()` | `FLOAT` |
697-
| `float` | `pyarrow.float64()` | `DOUBLE` |
698-
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
699-
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
700-
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
701-
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
702-
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
703-
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` |
704-
| `datetime.date` | `pyarrow.date32()` | `DATE` |
705-
| `datetime.time` | `pyarrow.time32(unit)` or `pyarrow.time64(unit)` | `TIME(p)` |
690+
### Scalar Types
691+
692+
| Python Native Type | PyArrow Type | Paimon Type |
693+
|:--------------------|:---------------------------------------|:----------------------------------|
694+
| `int` | `pyarrow.int8()` | `TINYINT` |
695+
| `int` | `pyarrow.int16()` | `SMALLINT` |
696+
| `int` | `pyarrow.int32()` | `INT` |
697+
| `int` | `pyarrow.int64()` | `BIGINT` |
698+
| `float` | `pyarrow.float32()` | `FLOAT` |
699+
| `float` | `pyarrow.float64()` | `DOUBLE` |
700+
| `bool` | `pyarrow.bool_()` | `BOOLEAN` |
701+
| `str` | `pyarrow.string()` | `STRING`, `CHAR(n)`, `VARCHAR(n)` |
702+
| `bytes` | `pyarrow.binary()` | `BYTES`, `VARBINARY(n)` |
703+
| `bytes` | `pyarrow.binary(length)` | `BINARY(length)` |
704+
| `bytes` | `pyarrow.large_binary()` | `BLOB` |
705+
| `decimal.Decimal` | `pyarrow.decimal128(precision, scale)` | `DECIMAL(precision, scale)` |
706+
| `datetime.datetime` | `pyarrow.timestamp(unit, tz=None)` | `TIMESTAMP(p)` — unit: `'s'` p=0, `'ms'` p=1–3, `'us'` p=4–6, `'ns'` p=7–9 |
707+
| `datetime.datetime` | `pyarrow.timestamp(unit, tz='UTC')` | `TIMESTAMP_LTZ(p)` — same unit/p mapping as above |
708+
| `datetime.date` | `pyarrow.date32()` | `DATE` |
709+
| `datetime.time` | `pyarrow.time32('ms')` | `TIME(p)` |
710+
711+
### Complex Types
712+
713+
| Python Native Type | PyArrow Type | Paimon Type |
714+
|:-------------------|:--------------------------------------|:-----------------------|
715+
| `list` | `pyarrow.list_(element_type)` | `ARRAY<element_type>` |
716+
| `dict` | `pyarrow.map_(key_type, value_type)` | `MAP<key, value>` |
717+
| `dict` | `pyarrow.struct([field, ...])` | `ROW<field ...>` |
718+
719+
### VARIANT Type
720+
721+
`VARIANT` stores semi-structured, schema-flexible data (JSON objects, arrays, and primitives)
722+
in the [Parquet Variant binary encoding](https://github.com/apache/parquet-format/blob/master/VariantEncoding.md).
723+
724+
pypaimon exposes VARIANT columns as Arrow `struct<value: binary NOT NULL, metadata: binary NOT NULL>` and
725+
provides `GenericVariant` for encoding, decoding, and path extraction.
726+
727+
Paimon supports two Parquet storage layouts for VARIANT:
728+
729+
- **Plain VARIANT** — the standard two-field struct (`value` + `metadata`). Default for all writes.
730+
- **Shredded VARIANT** — typed sub-columns are stored alongside overflow bytes, enabling column-skipping
731+
inside the Parquet file. Controlled by the `variant.shreddingSchema` table option.
732+
733+
{{< tabs "variant-read-write" >}}
734+
{{< tab "Plain VARIANT" >}}
735+
736+
**Read**
737+
738+
A VARIANT column arrives as `struct<value: binary, metadata: binary>` in every Arrow batch.
739+
Use `GenericVariant.from_arrow_struct` to decode each row:
740+
741+
```python
742+
from pypaimon.data.generic_variant import GenericVariant
743+
744+
read_builder = table.new_read_builder()
745+
result = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
746+
747+
for record in result.to_pylist():
748+
if (payload := record["payload"]) is not None:
749+
gv = GenericVariant.from_arrow_struct(payload)
750+
print(gv.to_python()) # decode to Python dict / list / scalar
751+
print(gv.to_json()) # decode to JSON string
752+
```
753+
754+
`from_arrow_struct` is a lightweight operation — it only wraps the two raw byte arrays without
755+
parsing them. Actual variant binary decoding is deferred to `to_python()` / `to_json()`.
756+
757+
**Write**
758+
759+
Build `GenericVariant` values and convert them to an Arrow column with `to_arrow_array`:
760+
761+
```python
762+
import pyarrow as pa
763+
from pypaimon.data.generic_variant import GenericVariant
764+
765+
gv1 = GenericVariant.from_json('{"city": "Beijing", "age": 30}')
766+
gv2 = GenericVariant.from_python({'tags': [1, 2, 3], 'active': True})
767+
# None represents SQL NULL
768+
769+
data = pa.table({
770+
'id': pa.array([1, 2, 3], type=pa.int32()),
771+
'payload': GenericVariant.to_arrow_array([gv1, gv2, None]),
772+
})
773+
774+
write_builder = table.new_batch_write_builder()
775+
table_write = write_builder.new_write()
776+
table_commit = write_builder.new_commit()
777+
table_write.write_arrow(data)
778+
table_commit.commit(table_write.prepare_commit())
779+
table_write.close()
780+
table_commit.close()
781+
```
782+
783+
{{< /tab >}}
784+
{{< tab "Shredded VARIANT" >}}
785+
786+
In shredded mode the VARIANT column is physically split inside Parquet into a three-field group:
787+
788+
```
789+
payload (GROUP)
790+
├── metadata BYTE_ARRAY -- key dictionary (always present)
791+
├── value BYTE_ARRAY OPTIONAL -- overflow bytes for un-shredded fields
792+
└── typed_value (GROUP) OPTIONAL
793+
├── age (GROUP)
794+
│ ├── value BYTE_ARRAY OPTIONAL
795+
│ └── typed_value INT64 OPTIONAL
796+
└── city (GROUP)
797+
├── value BYTE_ARRAY OPTIONAL
798+
└── typed_value BYTE_ARRAY OPTIONAL
799+
```
800+
801+
**Read — automatic reassembly**
802+
803+
When pypaimon reads a Parquet file that contains shredded VARIANT columns (whether written by Paimon Java
804+
or by pypaimon with shredding enabled), it **automatically detects and reassembles** them back to the
805+
standard `struct<value, metadata>` form before returning any batch. No code changes are needed on the
806+
read side:
807+
808+
```python
809+
from pypaimon.data.generic_variant import GenericVariant
810+
811+
# Works identically for both shredded and plain Parquet files
812+
read_builder = table.new_read_builder()
813+
result = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
814+
815+
for record in result.to_pylist():
816+
if (payload := record["payload"]) is not None:
817+
gv = GenericVariant.from_arrow_struct(payload) # same API as plain VARIANT
818+
print(gv.to_python())
819+
```
820+
821+
Reassembly (reconstructing the variant binary from `typed_value` sub-columns and overflow bytes)
822+
happens inside `FormatPyArrowReader.read_arrow_batch()` — that is, **at batch read time**, before
823+
the Arrow data is returned to the caller. Note: When sub-field projection is active
824+
(`with_variant_sub_fields`), reassembly is skipped entirely and only the requested typed
825+
sub-columns are decoded.
826+
827+
**Write — shredding mode**
828+
829+
Set the `variant.shreddingSchema` table option to a JSON-encoded `ROW` type that describes which
830+
sub-fields of which VARIANT columns to shred. The top-level fields map VARIANT column names to their
831+
sub-field schemas:
832+
833+
```python
834+
import json
835+
836+
shredding_schema = json.dumps({
837+
"type": "ROW",
838+
"fields": [
839+
{
840+
"id": 0,
841+
"name": "payload", # VARIANT column name in the table
842+
"type": {
843+
"type": "ROW",
844+
"fields": [ # sub-fields to extract as typed columns
845+
{"id": 0, "name": "age", "type": "BIGINT"},
846+
{"id": 1, "name": "city", "type": "VARCHAR"},
847+
]
848+
}
849+
}
850+
]
851+
})
852+
853+
# Pass the option when creating the table
854+
schema = Schema.from_pyarrow_schema(
855+
pa_schema,
856+
options={'variant.shreddingSchema': shredding_schema}
857+
)
858+
catalog.create_table('db.events', schema, ignore_if_exists=True)
859+
```
860+
861+
Once the option is set, each `write_arrow` call transparently converts VARIANT columns to the shredded
862+
Parquet layout. The read path — including Java Paimon and other engines — can then exploit the typed
863+
sub-columns for column-skipping via sub-field projection.
864+
865+
Fields not listed in `variant.shreddingSchema` are stored in the overflow `value` bytes and remain
866+
fully accessible on the read path.
867+
868+
Supported Paimon type strings for shredded sub-fields: `BOOLEAN`, `INT`, `BIGINT`, `FLOAT`, `DOUBLE`,
869+
`VARCHAR`, `DECIMAL(p,s)`, and nested `ROW` types for recursive object shredding.
870+
871+
{{< /tab >}}
872+
{{< /tabs >}}
873+
874+
875+
**`GenericVariant` API:**
876+
877+
| Method | Description |
878+
|:-------|:------------|
879+
| `GenericVariant.from_json(json_str)` | Build from a JSON string |
880+
| `GenericVariant.from_python(obj)` | Build from a Python object (`dict`, `list`, `int`, `str`, …) |
881+
| `GenericVariant.from_arrow_struct({"value": b"...", "metadata": b"..."})` | Wrap raw bytes from an Arrow VARIANT struct row (read path) |
882+
| `GenericVariant.to_arrow_array([gv1, gv2, None, ...])` | Convert a list of `GenericVariant` (or `None`) to a `pa.StructArray` for writing |
883+
| `gv.to_python()` | Decode to native Python (`dict`, `list`, `int`, `str`, `None`, …) |
884+
| `gv.to_json()` | Decode to a JSON string |
885+
| `gv.value()` | Return raw value bytes |
886+
| `gv.metadata()` | Return raw metadata bytes |
887+
888+
**Limitations:**
889+
890+
- `VARIANT` is only supported with Parquet file format. Writing to ORC or Avro raises `NotImplementedError`.
891+
- `VARIANT` cannot be used as a primary key or partition key.
706892

707893
## Predicate
708894

0 commit comments

Comments
 (0)