diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0dfc5eb55a..4a048b91c2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -38,6 +38,7 @@ import warnings from abc import ABC, abstractmethod from collections.abc import Callable, Iterable, Iterator +from contextlib import contextmanager from copy import copy from dataclasses import dataclass from enum import Enum @@ -1601,6 +1602,19 @@ def _get_column_projection_values( return projected_missing_fields +@contextmanager +def _handle_unsupported_pyarrow_uuid_filter() -> Iterator[None]: + try: + yield + except pyarrow.lib.ArrowNotImplementedError as e: + if "arrow.uuid" in str(e): + raise NotImplementedError( + "Filtering on UUID columns is not supported. " + "See https://github.com/apache/iceberg-python/issues/2372 for context." + ) from e + raise + + def _task_to_record_batches( io: FileIO, task: FileScanTask, @@ -1645,14 +1659,15 @@ def _task_to_record_batches( file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - fragment_scanner = ds.Scanner.from_fragment( - fragment=fragment, - schema=physical_schema, - # This will push down the query to Arrow. - # But in case there are positional deletes, we have to apply them first - filter=pyarrow_filter if not positional_deletes else None, - columns=[col.name for col in file_project_schema.columns], - ) + with _handle_unsupported_pyarrow_uuid_filter(): + fragment_scanner = ds.Scanner.from_fragment( + fragment=fragment, + schema=physical_schema, + # This will push down the query to Arrow. + # But in case there are positional deletes, we have to apply them first + filter=pyarrow_filter if not positional_deletes else None, + columns=[col.name for col in file_project_schema.columns], + ) next_index = 0 batches = fragment_scanner.to_batches() @@ -1674,7 +1689,9 @@ def _task_to_record_batches( if pyarrow_filter is not None: # Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 ) table = pa.Table.from_batches([current_batch]) - table = table.filter(pyarrow_filter) + with _handle_unsupported_pyarrow_uuid_filter(): + table = table.filter(pyarrow_filter) + # skip empty batches if table.num_rows == 0: continue diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2170741bdd..cb113f7a69 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -57,6 +57,7 @@ BoundStartsWith, GreaterThan, Not, + NotEqualTo, Or, ) from pyiceberg.expressions.literals import literal @@ -113,6 +114,7 @@ TimestampType, TimestamptzType, TimeType, + UUIDType, ) from tests.catalog.test_base import InMemoryCatalog from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES @@ -1156,6 +1158,95 @@ def _set_spec_id(datafile: DataFile) -> DataFile: ) +def test_projection_uuid_filter_with_physical_fixed_uuid(tmpdir: str) -> None: + schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False)) + uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + pyarrow_schema = pa.schema( + [ + pa.field("uuid_col", pa.binary(16), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"1"}), + ], + metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}, + ) + filepath = _write_table_to_file( + f"file:{tmpdir}/uuid_fixed.parquet", + pyarrow_schema, + pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.binary(16))], schema=pyarrow_schema), + ) + + actual = project(schema, [filepath], expr=NotEqualTo("uuid_col", str(uuid_value))) + + assert actual["uuid_col"].to_pylist() == [] + + +def test_projection_uuid_filter_with_arrow_uuid_raises_public_error_from_scanner(tmpdir: str) -> None: + schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False)) + uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + pyarrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}) + filepath = _write_table_to_file( + f"file:{tmpdir}/uuid_logical.parquet", + pyarrow_schema, + pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.uuid())], schema=pyarrow_schema), + ) + + with pytest.raises(NotImplementedError, match="Filtering on UUID columns is not supported") as exc_info: + project(schema, [filepath], expr=NotEqualTo("uuid_col", str(uuid_value))) + + assert "https://github.com/apache/iceberg-python/issues/2372" in str(exc_info.value) + + +def test_projection_uuid_filter_with_arrow_uuid_and_positional_deletes_raises_public_error(tmpdir: str) -> None: + schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False)) + uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + pyarrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}) + data_file_path = _write_table_to_file( + f"file:{tmpdir}/uuid_logical_with_delete.parquet", + pyarrow_schema, + pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.uuid())], schema=pyarrow_schema), + ) + delete_file_path = f"{tmpdir}/uuid_position_deletes.parquet" + pq.write_table(pa.table({"file_path": [data_file_path], "pos": [1]}), delete_file_path) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=1, + file_size_in_bytes=1, + ) + data_file.spec_id = 0 + + with pytest.raises(NotImplementedError, match="Filtering on UUID columns is not supported") as exc_info: + ArrowScan( + table_metadata=TableMetadataV2( + location="file://a/b/", + last_column_id=1, + format_version=2, + schemas=[schema], + partition_specs=[PartitionSpec()], + ), + io=PyArrowFileIO(), + projected_schema=schema, + row_filter=NotEqualTo("uuid_col", str(uuid_value)), + case_sensitive=True, + ).to_table( + tasks=[ + FileScanTask( + data_file=data_file, + delete_files={ + DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PARQUET, + ) + }, + ) + ] + ) + + assert "https://github.com/apache/iceberg-python/issues/2372" in str(exc_info.value) + + def test_projection_add_column(file_int: str) -> None: schema = Schema( # All new IDs