Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import warnings
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Iterator
from contextlib import contextmanager
from copy import copy
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -1601,6 +1602,19 @@ def _get_column_projection_values(
return projected_missing_fields


@contextmanager
def _handle_unsupported_pyarrow_uuid_filter() -> Iterator[None]:
try:
yield
except pyarrow.lib.ArrowNotImplementedError as e:
if "arrow.uuid" in str(e):
raise NotImplementedError(
"Filtering on UUID columns is not supported. "
"See https://github.com/apache/iceberg-python/issues/2372 for context."
) from e
raise


def _task_to_record_batches(
io: FileIO,
task: FileScanTask,
Expand Down Expand Up @@ -1645,14 +1659,15 @@ def _task_to_record_batches(

file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],
)
with _handle_unsupported_pyarrow_uuid_filter():
fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],
)

next_index = 0
batches = fragment_scanner.to_batches()
Expand All @@ -1674,7 +1689,9 @@ def _task_to_record_batches(
if pyarrow_filter is not None:
# Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 )
table = pa.Table.from_batches([current_batch])
table = table.filter(pyarrow_filter)
with _handle_unsupported_pyarrow_uuid_filter():
table = table.filter(pyarrow_filter)

# skip empty batches
if table.num_rows == 0:
continue
Expand Down
91 changes: 91 additions & 0 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
BoundStartsWith,
GreaterThan,
Not,
NotEqualTo,
Or,
)
from pyiceberg.expressions.literals import literal
Expand Down Expand Up @@ -113,6 +114,7 @@
TimestampType,
TimestamptzType,
TimeType,
UUIDType,
)
from tests.catalog.test_base import InMemoryCatalog
from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES
Expand Down Expand Up @@ -1156,6 +1158,95 @@ def _set_spec_id(datafile: DataFile) -> DataFile:
)


def test_projection_uuid_filter_with_physical_fixed_uuid(tmpdir: str) -> None:
schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False))
uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b")
pyarrow_schema = pa.schema(
[
pa.field("uuid_col", pa.binary(16), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"1"}),
],
metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)},
)
filepath = _write_table_to_file(
f"file:{tmpdir}/uuid_fixed.parquet",
pyarrow_schema,
pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.binary(16))], schema=pyarrow_schema),
)

actual = project(schema, [filepath], expr=NotEqualTo("uuid_col", str(uuid_value)))

assert actual["uuid_col"].to_pylist() == []


def test_projection_uuid_filter_with_arrow_uuid_raises_public_error_from_scanner(tmpdir: str) -> None:
schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False))
uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b")
pyarrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)})
filepath = _write_table_to_file(
f"file:{tmpdir}/uuid_logical.parquet",
pyarrow_schema,
pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.uuid())], schema=pyarrow_schema),
)

with pytest.raises(NotImplementedError, match="Filtering on UUID columns is not supported") as exc_info:
project(schema, [filepath], expr=NotEqualTo("uuid_col", str(uuid_value)))

assert "https://github.com/apache/iceberg-python/issues/2372" in str(exc_info.value)


def test_projection_uuid_filter_with_arrow_uuid_and_positional_deletes_raises_public_error(tmpdir: str) -> None:
schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False))
uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b")
pyarrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)})
data_file_path = _write_table_to_file(
f"file:{tmpdir}/uuid_logical_with_delete.parquet",
pyarrow_schema,
pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.uuid())], schema=pyarrow_schema),
)
delete_file_path = f"{tmpdir}/uuid_position_deletes.parquet"
pq.write_table(pa.table({"file_path": [data_file_path], "pos": [1]}), delete_file_path)

data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=data_file_path,
file_format=FileFormat.PARQUET,
partition={},
record_count=1,
file_size_in_bytes=1,
)
data_file.spec_id = 0

with pytest.raises(NotImplementedError, match="Filtering on UUID columns is not supported") as exc_info:
ArrowScan(
table_metadata=TableMetadataV2(
location="file://a/b/",
last_column_id=1,
format_version=2,
schemas=[schema],
partition_specs=[PartitionSpec()],
),
io=PyArrowFileIO(),
projected_schema=schema,
row_filter=NotEqualTo("uuid_col", str(uuid_value)),
case_sensitive=True,
).to_table(
tasks=[
FileScanTask(
data_file=data_file,
delete_files={
DataFile.from_args(
content=DataFileContent.POSITION_DELETES,
file_path=delete_file_path,
file_format=FileFormat.PARQUET,
)
},
)
]
)

assert "https://github.com/apache/iceberg-python/issues/2372" in str(exc_info.value)


def test_projection_add_column(file_int: str) -> None:
schema = Schema(
# All new IDs
Expand Down