diff --git a/tests/table/test_upsert.py b/tests/table/test_upsert.py index 08f90c6600..9328f48572 100644 --- a/tests/table/test_upsert.py +++ b/tests/table/test_upsert.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import datetime from pathlib import PosixPath import pyarrow as pa @@ -26,11 +27,13 @@ from pyiceberg.expressions import AlwaysTrue, And, EqualTo, Reference from pyiceberg.expressions.literals import LongLiteral from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table, UpsertResult from pyiceberg.table.snapshots import Operation from pyiceberg.table.upsert_util import create_match_filter -from pyiceberg.types import IntegerType, NestedField, StringType, StructType +from pyiceberg.transforms import IdentityTransform +from pyiceberg.types import DateType, IntegerType, NestedField, StringType, StructType from tests.catalog.test_base import InMemoryCatalog @@ -714,6 +717,48 @@ def test_upsert_with_nulls(catalog: Catalog) -> None: ) +def test_upsert_updates_existing_row_when_non_join_partition_value_changes(catalog: Catalog) -> None: + identifier = "default.test_upsert_non_join_partition_value_change" + _drop_table(catalog, identifier) + + schema = Schema( + NestedField(1, "order_id", IntegerType(), required=True), + NestedField(2, "order_date", DateType(), required=True), + NestedField(3, "order_type", StringType(), required=True), + ) + spec = PartitionSpec(PartitionField(source_id=2, field_id=1000, transform=IdentityTransform(), name="order_date")) + table = catalog.create_table(identifier, schema=schema, partition_spec=spec) + + arrow_schema = pa.schema( + [ + pa.field("order_id", pa.int32(), nullable=False), + pa.field("order_date", pa.date32(), nullable=False), + pa.field("order_type", pa.string(), nullable=False), + ] + ) + table.append( + pa.Table.from_pylist( + [{"order_id": 1, "order_date": datetime.date(2026, 1, 1), "order_type": "A"}], + schema=arrow_schema, + ) + ) + + result = table.upsert( + pa.Table.from_pylist( + [{"order_id": 1, "order_date": datetime.date(2026, 1, 2), "order_type": "B"}], + schema=arrow_schema, + ), + join_cols=["order_id"], + ) + + assert result.rows_updated == 1 + assert result.rows_inserted == 0 + assert table.scan().to_arrow() == pa.Table.from_pylist( + [{"order_id": 1, "order_date": datetime.date(2026, 1, 2), "order_type": "B"}], + schema=arrow_schema, + ) + + def test_transaction(catalog: Catalog) -> None: """Test the upsert within a Transaction. Make sure that if something fails the entire Transaction is rolled back."""