From 8f28a6e934b12841200ad6a2d17b092d6e34fed4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Anast=C3=A1cio?= Date: Mon, 5 Jan 2026 14:38:49 -0300 Subject: [PATCH 1/2] fix: raise NotImplementedError when filtering by UUID column PyArrow does not support filtering on UUID-typed columns. This commit raises a NotImplementedError with a clear message when such a filter is attempted --- pyiceberg/io/pyarrow.py | 11 ++++++++++- tests/integration/test_reads.py | 32 ++++++++++++++++---------------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0dfc5eb55a..2e811dbedd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -203,6 +203,10 @@ MAP_VALUE_NAME = "value" DOC = "doc" UTC_ALIASES = {"UTC", "+00:00", "Etc/UTC", "Z"} +UUID_FILTER_NOT_SUPPORTED_ERROR_MESSAGE = ( + f"Filtering on UUID columns is not supported by the installed PyArrow version ({pa.__version__})" +) + T = TypeVar("T") @@ -1641,7 +1645,12 @@ def _task_to_record_batches( bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_field_values=projected_missing_fields ) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) - pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) + try: + pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) + except pyarrow.lib.ArrowNotImplementedError as e: + if "arrow.uuid" in str(e): + raise NotImplementedError(UUID_FILTER_NOT_SUPPORTED_ERROR_MESSAGE) from e + raise file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 6c8b4a20a7..6a60c0bfa2 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -820,17 +820,20 @@ def test_partitioned_tables(catalog: Catalog) -> None: @pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) def test_unpartitioned_uuid_table(catalog: Catalog) -> None: unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") - arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() - assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967")] - - arrow_table_neq = unpartitioned_uuid.scan( - row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'" - ).to_arrow() - assert arrow_table_neq["uuid_col"].to_pylist() == [ - uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226"), - uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"), - uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e"), - ] + try: + arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() + assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967")] + + arrow_table_neq = unpartitioned_uuid.scan( + row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'" + ).to_arrow() + assert arrow_table_neq["uuid_col"].to_pylist() == [ + uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226"), + uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"), + uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e"), + ] + except NotImplementedError as e: + assert "Filtering on UUID columns is not supported" in str(e) @pytest.mark.integration @@ -840,14 +843,11 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> None: arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() assert arrow_table_eq["fixed_col"].to_pylist() == [b"1234567890123456789012345"] - arrow_table_neq = fixed_table.scan( - row_filter=And( - NotEqualTo("fixed_col", b"1234567890123456789012345"), NotEqualTo("uuid_col", "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") - ) - ).to_arrow() + arrow_table_neq = fixed_table.scan(row_filter=NotEqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() assert arrow_table_neq["fixed_col"].to_pylist() == [ b"1231231231231231231231231", b"12345678901234567ass12345", + b"asdasasdads12312312312111", b"qweeqwwqq1231231231231111", ] From c5406d45037bc37185d9f077b5a617d5d823e116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Anast=C3=A1cio?= Date: Tue, 26 May 2026 22:46:57 -0300 Subject: [PATCH 2/2] Handle UUID filter failures at scanner boundary --- pyiceberg/io/pyarrow.py | 46 ++++++++++------- tests/integration/test_reads.py | 32 ++++++------ tests/io/test_pyarrow.py | 91 +++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 35 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 2e811dbedd..4a048b91c2 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -38,6 +38,7 @@ import warnings from abc import ABC, abstractmethod from collections.abc import Callable, Iterable, Iterator +from contextlib import contextmanager from copy import copy from dataclasses import dataclass from enum import Enum @@ -203,10 +204,6 @@ MAP_VALUE_NAME = "value" DOC = "doc" UTC_ALIASES = {"UTC", "+00:00", "Etc/UTC", "Z"} -UUID_FILTER_NOT_SUPPORTED_ERROR_MESSAGE = ( - f"Filtering on UUID columns is not supported by the installed PyArrow version ({pa.__version__})" -) - T = TypeVar("T") @@ -1605,6 +1602,19 @@ def _get_column_projection_values( return projected_missing_fields +@contextmanager +def _handle_unsupported_pyarrow_uuid_filter() -> Iterator[None]: + try: + yield + except pyarrow.lib.ArrowNotImplementedError as e: + if "arrow.uuid" in str(e): + raise NotImplementedError( + "Filtering on UUID columns is not supported. " + "See https://github.com/apache/iceberg-python/issues/2372 for context." + ) from e + raise + + def _task_to_record_batches( io: FileIO, task: FileScanTask, @@ -1645,23 +1655,19 @@ def _task_to_record_batches( bound_row_filter, file_schema, case_sensitive=case_sensitive, projected_field_values=projected_missing_fields ) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) - try: - pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) - except pyarrow.lib.ArrowNotImplementedError as e: - if "arrow.uuid" in str(e): - raise NotImplementedError(UUID_FILTER_NOT_SUPPORTED_ERROR_MESSAGE) from e - raise + pyarrow_filter = expression_to_pyarrow(bound_file_filter, file_schema) file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - fragment_scanner = ds.Scanner.from_fragment( - fragment=fragment, - schema=physical_schema, - # This will push down the query to Arrow. - # But in case there are positional deletes, we have to apply them first - filter=pyarrow_filter if not positional_deletes else None, - columns=[col.name for col in file_project_schema.columns], - ) + with _handle_unsupported_pyarrow_uuid_filter(): + fragment_scanner = ds.Scanner.from_fragment( + fragment=fragment, + schema=physical_schema, + # This will push down the query to Arrow. + # But in case there are positional deletes, we have to apply them first + filter=pyarrow_filter if not positional_deletes else None, + columns=[col.name for col in file_project_schema.columns], + ) next_index = 0 batches = fragment_scanner.to_batches() @@ -1683,7 +1689,9 @@ def _task_to_record_batches( if pyarrow_filter is not None: # Temporary fix until PyArrow 21 is released ( https://github.com/apache/arrow/pull/46057 ) table = pa.Table.from_batches([current_batch]) - table = table.filter(pyarrow_filter) + with _handle_unsupported_pyarrow_uuid_filter(): + table = table.filter(pyarrow_filter) + # skip empty batches if table.num_rows == 0: continue diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 6a60c0bfa2..6c8b4a20a7 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -820,20 +820,17 @@ def test_partitioned_tables(catalog: Catalog) -> None: @pytest.mark.parametrize("catalog", [lf("session_catalog_hive"), lf("session_catalog")]) def test_unpartitioned_uuid_table(catalog: Catalog) -> None: unpartitioned_uuid = catalog.load_table("default.test_uuid_and_fixed_unpartitioned") - try: - arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() - assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967")] - - arrow_table_neq = unpartitioned_uuid.scan( - row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'" - ).to_arrow() - assert arrow_table_neq["uuid_col"].to_pylist() == [ - uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226"), - uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"), - uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e"), - ] - except NotImplementedError as e: - assert "Filtering on UUID columns is not supported" in str(e) + arrow_table_eq = unpartitioned_uuid.scan(row_filter="uuid_col == '102cb62f-e6f8-4eb0-9973-d9b012ff0967'").to_arrow() + assert arrow_table_eq["uuid_col"].to_pylist() == [uuid.UUID("102cb62f-e6f8-4eb0-9973-d9b012ff0967")] + + arrow_table_neq = unpartitioned_uuid.scan( + row_filter="uuid_col != '102cb62f-e6f8-4eb0-9973-d9b012ff0967' and uuid_col != '639cccce-c9d2-494a-a78c-278ab234f024'" + ).to_arrow() + assert arrow_table_neq["uuid_col"].to_pylist() == [ + uuid.UUID("ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226"), + uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b"), + uuid.UUID("923dae77-83d6-47cd-b4b0-d383e64ee57e"), + ] @pytest.mark.integration @@ -843,11 +840,14 @@ def test_unpartitioned_fixed_table(catalog: Catalog) -> None: arrow_table_eq = fixed_table.scan(row_filter=EqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() assert arrow_table_eq["fixed_col"].to_pylist() == [b"1234567890123456789012345"] - arrow_table_neq = fixed_table.scan(row_filter=NotEqualTo("fixed_col", b"1234567890123456789012345")).to_arrow() + arrow_table_neq = fixed_table.scan( + row_filter=And( + NotEqualTo("fixed_col", b"1234567890123456789012345"), NotEqualTo("uuid_col", "c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + ) + ).to_arrow() assert arrow_table_neq["fixed_col"].to_pylist() == [ b"1231231231231231231231231", b"12345678901234567ass12345", - b"asdasasdads12312312312111", b"qweeqwwqq1231231231231111", ] diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 2170741bdd..cb113f7a69 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -57,6 +57,7 @@ BoundStartsWith, GreaterThan, Not, + NotEqualTo, Or, ) from pyiceberg.expressions.literals import literal @@ -113,6 +114,7 @@ TimestampType, TimestamptzType, TimeType, + UUIDType, ) from tests.catalog.test_base import InMemoryCatalog from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES @@ -1156,6 +1158,95 @@ def _set_spec_id(datafile: DataFile) -> DataFile: ) +def test_projection_uuid_filter_with_physical_fixed_uuid(tmpdir: str) -> None: + schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False)) + uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + pyarrow_schema = pa.schema( + [ + pa.field("uuid_col", pa.binary(16), metadata={PYARROW_PARQUET_FIELD_ID_KEY: b"1"}), + ], + metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}, + ) + filepath = _write_table_to_file( + f"file:{tmpdir}/uuid_fixed.parquet", + pyarrow_schema, + pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.binary(16))], schema=pyarrow_schema), + ) + + actual = project(schema, [filepath], expr=NotEqualTo("uuid_col", str(uuid_value))) + + assert actual["uuid_col"].to_pylist() == [] + + +def test_projection_uuid_filter_with_arrow_uuid_raises_public_error_from_scanner(tmpdir: str) -> None: + schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False)) + uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + pyarrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}) + filepath = _write_table_to_file( + f"file:{tmpdir}/uuid_logical.parquet", + pyarrow_schema, + pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.uuid())], schema=pyarrow_schema), + ) + + with pytest.raises(NotImplementedError, match="Filtering on UUID columns is not supported") as exc_info: + project(schema, [filepath], expr=NotEqualTo("uuid_col", str(uuid_value))) + + assert "https://github.com/apache/iceberg-python/issues/2372" in str(exc_info.value) + + +def test_projection_uuid_filter_with_arrow_uuid_and_positional_deletes_raises_public_error(tmpdir: str) -> None: + schema = Schema(NestedField(1, "uuid_col", UUIDType(), required=False)) + uuid_value = uuid.UUID("c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b") + pyarrow_schema = schema_to_pyarrow(schema, metadata={ICEBERG_SCHEMA: bytes(schema.model_dump_json(), UTF8)}) + data_file_path = _write_table_to_file( + f"file:{tmpdir}/uuid_logical_with_delete.parquet", + pyarrow_schema, + pa.Table.from_arrays([pa.array([uuid_value.bytes], type=pa.uuid())], schema=pyarrow_schema), + ) + delete_file_path = f"{tmpdir}/uuid_position_deletes.parquet" + pq.write_table(pa.table({"file_path": [data_file_path], "pos": [1]}), delete_file_path) + + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=data_file_path, + file_format=FileFormat.PARQUET, + partition={}, + record_count=1, + file_size_in_bytes=1, + ) + data_file.spec_id = 0 + + with pytest.raises(NotImplementedError, match="Filtering on UUID columns is not supported") as exc_info: + ArrowScan( + table_metadata=TableMetadataV2( + location="file://a/b/", + last_column_id=1, + format_version=2, + schemas=[schema], + partition_specs=[PartitionSpec()], + ), + io=PyArrowFileIO(), + projected_schema=schema, + row_filter=NotEqualTo("uuid_col", str(uuid_value)), + case_sensitive=True, + ).to_table( + tasks=[ + FileScanTask( + data_file=data_file, + delete_files={ + DataFile.from_args( + content=DataFileContent.POSITION_DELETES, + file_path=delete_file_path, + file_format=FileFormat.PARQUET, + ) + }, + ) + ] + ) + + assert "https://github.com/apache/iceberg-python/issues/2372" in str(exc_info.value) + + def test_projection_add_column(file_int: str) -> None: schema = Schema( # All new IDs