From ccfcbcd3a36b7687339807c1bc3dc7977ae60d83 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 5 Jan 2025 20:20:51 +0100 Subject: [PATCH 01/36] chore: scaffolding --- pyiceberg/table/__init__.py | 13 +++++++++++ pyiceberg/table/update/sort_order.py | 35 ++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 pyiceberg/table/update/sort_order.py diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2469a9ed7b..36b3d45520 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,6 +117,7 @@ UpdateSnapshot, _FastAppendFiles, ) +from pyiceberg.table.update.sort_order import UpdateSortOrder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import ( @@ -403,6 +404,10 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive case_sensitive=case_sensitive, name_mapping=self.table_metadata.name_mapping(), ) + + + def replace_sort_order(self) -> None: + ... def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. @@ -1050,6 +1055,14 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.name_mapping(), ) + def replace_sort_order(self) -> UpdateSortOrder: + """Create a new UpdateSortOrder to replace the sort order of this table. + + Returns: + A new UpdateSortOrder. + """ + return UpdateSortOrder(self) + def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py new file mode 100644 index 0000000000..a08bc3784b --- /dev/null +++ b/pyiceberg/table/update/sort_order.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, +) + +from pyiceberg.table.update import ( + UpdateTableMetadata, +) + +if TYPE_CHECKING: + from pyiceberg.table import Transaction + + +class UpdateSpec(UpdateTableMetadata["UpdateSpec"]): + _transaction: Transaction + + def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: + super().__init__(transaction) From 95da8f344434bc2aff6d4cf492488f161a6f1c6b Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 5 Jan 2025 20:24:30 +0100 Subject: [PATCH 02/36] chore: scaffolding --- pyiceberg/table/__init__.py | 8 +++----- pyiceberg/table/update/sort_order.py | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 36b3d45520..e16255603c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -404,10 +404,8 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive case_sensitive=case_sensitive, name_mapping=self.table_metadata.name_mapping(), ) - - - def replace_sort_order(self) -> None: - ... + + def replace_sort_order(self) -> None: ... def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. @@ -1061,7 +1059,7 @@ def replace_sort_order(self) -> UpdateSortOrder: Returns: A new UpdateSortOrder. """ - return UpdateSortOrder(self) + return UpdateSortOrder(transaction=Transaction(self, autocommit=True)) def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py index a08bc3784b..bba50e7645 100644 --- a/pyiceberg/table/update/sort_order.py +++ b/pyiceberg/table/update/sort_order.py @@ -16,11 +16,12 @@ # under the License. from __future__ import annotations -from typing import ( - TYPE_CHECKING, -) +from typing import TYPE_CHECKING, Tuple from pyiceberg.table.update import ( + TableRequirement, + TableUpdate, + UpdatesAndRequirements, UpdateTableMetadata, ) @@ -28,8 +29,15 @@ from pyiceberg.table import Transaction -class UpdateSpec(UpdateTableMetadata["UpdateSpec"]): +class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): _transaction: Transaction def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: super().__init__(transaction) + + def _commit(self) -> UpdatesAndRequirements: + """Apply the pending changes and commit.""" + requirements: Tuple[TableRequirement, ...] = () + updates: Tuple[TableUpdate, ...] = () + + return updates, requirements From 253967d0617fd6708104419092c787ff051f0cd3 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 5 Jan 2025 20:32:27 +0100 Subject: [PATCH 03/36] chore: add skeleton for asc/desc methods --- pyiceberg/table/update/sort_order.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py index bba50e7645..b194537422 100644 --- a/pyiceberg/table/update/sort_order.py +++ b/pyiceberg/table/update/sort_order.py @@ -16,8 +16,9 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Tuple, Any +from pyiceberg.transforms import Transform, IdentityTransform from pyiceberg.table.update import ( TableRequirement, TableUpdate, @@ -34,6 +35,12 @@ class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: super().__init__(transaction) + + def asc(self, source_column_name: str, transform: Transform[Any, Any] = IdentityTransform()) -> UpdateSortOrder: + ... + + def desc(self, source_column_name: str, transform: Transform[Any, Any] = IdentityTransform()) -> UpdateSortOrder: + ... def _commit(self) -> UpdatesAndRequirements: """Apply the pending changes and commit.""" From 7b5a98e8e9a39c039bd3f0139b9b17bace4e9720 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 5 Jan 2025 20:56:17 +0100 Subject: [PATCH 04/36] chore: scaffolding --- pyiceberg/table/update/sort_order.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py index b194537422..6629ae3447 100644 --- a/pyiceberg/table/update/sort_order.py +++ b/pyiceberg/table/update/sort_order.py @@ -16,15 +16,16 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Tuple, Any +from typing import TYPE_CHECKING, Any, Tuple -from pyiceberg.transforms import Transform, IdentityTransform from pyiceberg.table.update import ( TableRequirement, TableUpdate, UpdatesAndRequirements, UpdateTableMetadata, ) +from pyiceberg.transforms import Transform +from pyiceberg.table.sorting import NullOrder if TYPE_CHECKING: from pyiceberg.table import Transaction @@ -35,12 +36,12 @@ class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: super().__init__(transaction) - - def asc(self, source_column_name: str, transform: Transform[Any, Any] = IdentityTransform()) -> UpdateSortOrder: - ... - - def desc(self, source_column_name: str, transform: Transform[Any, Any] = IdentityTransform()) -> UpdateSortOrder: - ... + + def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: + return self + + def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: + return self def _commit(self) -> UpdatesAndRequirements: """Apply the pending changes and commit.""" From 304a806bbe4f7c562a37b51a41c96ebe7d33c2c9 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 5 Jan 2025 21:01:40 +0100 Subject: [PATCH 05/36] chore: change method names --- pyiceberg/table/__init__.py | 10 +++++----- pyiceberg/table/update/sort_order.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index e16255603c..817184f381 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,7 +117,7 @@ UpdateSnapshot, _FastAppendFiles, ) -from pyiceberg.table.update.sort_order import UpdateSortOrder +from pyiceberg.table.update.sort_order import SortOrderBuilder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import ( @@ -1053,13 +1053,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.name_mapping(), ) - def replace_sort_order(self) -> UpdateSortOrder: - """Create a new UpdateSortOrder to replace the sort order of this table. + def replace_sort_order(self) -> SortOrderBuilder: + """Create a new SortOrderBuilder to replace the sort order of this table. Returns: - A new UpdateSortOrder. + A new SortOrderBuilder. """ - return UpdateSortOrder(transaction=Transaction(self, autocommit=True)) + return SortOrderBuilder(transaction=Transaction(self, autocommit=True)) def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py index 6629ae3447..2512a0af03 100644 --- a/pyiceberg/table/update/sort_order.py +++ b/pyiceberg/table/update/sort_order.py @@ -31,7 +31,7 @@ from pyiceberg.table import Transaction -class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): +class SortOrderBuilder(UpdateTableMetadata["SortOrderBuilder"]): _transaction: Transaction def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: From 48ac5c079f7bf2f3f3daf5aa1af8993960ddb86d Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Tue, 7 Jan 2025 21:40:58 +0100 Subject: [PATCH 06/36] chore: update methods --- pyiceberg/table/update/sorting.py | 103 ++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 pyiceberg/table/update/sorting.py diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py new file mode 100644 index 0000000000..dd2ded25cf --- /dev/null +++ b/pyiceberg/table/update/sorting.py @@ -0,0 +1,103 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Tuple, Dict, List + +from pyiceberg.table.update import ( + TableRequirement, + TableUpdate, + UpdatesAndRequirements, + UpdateTableMetadata, +) +from pyiceberg.transforms import Transform +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder + +if TYPE_CHECKING: + from pyiceberg.table import Transaction + + +class SortOrderBuilder: + + def __init__(self, case_sensitive: bool = True) -> None: + self._fields: List[SortField] = [] + self._case_sensitive = case_sensitive + + def add_sort_field( + self, + source_id: int, + transform: Transform[Any, Any], + direction: SortDirection, + null_order: NullOrder, + ) -> SortOrderBuilder: + self._fields.append( + SortField( + source_id=source_id, + transform=transform, + direction=direction, + null_order=null_order, + ) + ) + return self + + @property + def sort_order(self) -> SortOrder: # todo: add sort order id? + return SortOrder(*self._fields) + + +class ReplaceSortOrder(UpdateTableMetadata["ReplaceSortOrder"]): + _transaction: Transaction + _builder: SortOrderBuilder + _last_assigned_order_id: int + _case_sensitive: bool + + def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: + super().__init__(transaction) + self._builder = SortOrderBuilder(case_sensitive) + self._case_sensitive = case_sensitive + self._last_sort_order_id = transaction.table_metadata.default_sort_order_id + + def _column_name_to_id(self, column_name: str) -> int: + return self._transaction.table_metadata.schema().find_field( + name_or_id=column_name, + case_sensitive=self._case_sensitive, + ).field_id + + def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> ReplaceSortOrder: + self._builder.add_sort_field( + source_id=self._column_name_to_id(source_column_name), + transform=transform, + direction=SortDirection.ASC, + null_order=null_order, + ) + return self + + def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> ReplaceSortOrder: + self._builder.add_sort_field( + source_id=self._column_name_to_id(source_column_name), + transform=transform, + direction=SortDirection.DESC, + null_order=null_order, + ) + return self + + def _commit(self) -> UpdatesAndRequirements: + """Apply the pending changes and commit.""" + requirements: Tuple[TableRequirement, ...] = () + updates: Tuple[TableUpdate, ...] = () + + return updates, requirements From a47067c38ed52eba908d5b4bfc8f5dd6af1ca992 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Tue, 7 Jan 2025 21:41:14 +0100 Subject: [PATCH 07/36] chore: update methods --- pyiceberg/table/update/sort_order.py | 51 ---------------------------- 1 file changed, 51 deletions(-) delete mode 100644 pyiceberg/table/update/sort_order.py diff --git a/pyiceberg/table/update/sort_order.py b/pyiceberg/table/update/sort_order.py deleted file mode 100644 index 2512a0af03..0000000000 --- a/pyiceberg/table/update/sort_order.py +++ /dev/null @@ -1,51 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Tuple - -from pyiceberg.table.update import ( - TableRequirement, - TableUpdate, - UpdatesAndRequirements, - UpdateTableMetadata, -) -from pyiceberg.transforms import Transform -from pyiceberg.table.sorting import NullOrder - -if TYPE_CHECKING: - from pyiceberg.table import Transaction - - -class SortOrderBuilder(UpdateTableMetadata["SortOrderBuilder"]): - _transaction: Transaction - - def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: - super().__init__(transaction) - - def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: - return self - - def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: - return self - - def _commit(self) -> UpdatesAndRequirements: - """Apply the pending changes and commit.""" - requirements: Tuple[TableRequirement, ...] = () - updates: Tuple[TableUpdate, ...] = () - - return updates, requirements From c1ab2ecde26f91e4cdfc55e120b669122c3dbc30 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Tue, 7 Jan 2025 21:41:26 +0100 Subject: [PATCH 08/36] chore: update imports --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 817184f381..6fca6d112f 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,7 +117,7 @@ UpdateSnapshot, _FastAppendFiles, ) -from pyiceberg.table.update.sort_order import SortOrderBuilder +from pyiceberg.table.update.sorting import SortOrderBuilder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import ( From 8f27d149719797eb52af11c07cd91423a55bf1d6 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Tue, 7 Jan 2025 21:42:54 +0100 Subject: [PATCH 09/36] chore: stupid renames --- pyiceberg/table/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6fca6d112f..32df27da54 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,7 +117,7 @@ UpdateSnapshot, _FastAppendFiles, ) -from pyiceberg.table.update.sorting import SortOrderBuilder +from pyiceberg.table.update.sorting import ReplaceSortOrder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import ( @@ -1053,13 +1053,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.name_mapping(), ) - def replace_sort_order(self) -> SortOrderBuilder: - """Create a new SortOrderBuilder to replace the sort order of this table. + def replace_sort_order(self) -> ReplaceSortOrder: + """Create a new ReplaceSortOrder to replace the sort order of this table. Returns: - A new SortOrderBuilder. + A new ReplaceSortOrder. """ - return SortOrderBuilder(transaction=Transaction(self, autocommit=True)) + return ReplaceSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=True) def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" From d8720f2edfb13e188c4915ac6480316c9f6387d1 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Tue, 7 Jan 2025 21:51:02 +0100 Subject: [PATCH 10/36] chore: lint --- pyiceberg/table/update/sorting.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index dd2ded25cf..a2ee561fe6 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -16,8 +16,9 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Tuple, Dict, List +from typing import TYPE_CHECKING, Any, List, Tuple +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder from pyiceberg.table.update import ( TableRequirement, TableUpdate, @@ -25,18 +26,16 @@ UpdateTableMetadata, ) from pyiceberg.transforms import Transform -from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder if TYPE_CHECKING: from pyiceberg.table import Transaction class SortOrderBuilder: - def __init__(self, case_sensitive: bool = True) -> None: self._fields: List[SortField] = [] self._case_sensitive = case_sensitive - + def add_sort_field( self, source_id: int, @@ -53,9 +52,9 @@ def add_sort_field( ) ) return self - + @property - def sort_order(self) -> SortOrder: # todo: add sort order id? + def sort_order(self) -> SortOrder: # todo: add sort order id? return SortOrder(*self._fields) @@ -72,10 +71,14 @@ def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> Non self._last_sort_order_id = transaction.table_metadata.default_sort_order_id def _column_name_to_id(self, column_name: str) -> int: - return self._transaction.table_metadata.schema().find_field( - name_or_id=column_name, - case_sensitive=self._case_sensitive, - ).field_id + return ( + self._transaction.table_metadata.schema() + .find_field( + name_or_id=column_name, + case_sensitive=self._case_sensitive, + ) + .field_id + ) def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> ReplaceSortOrder: self._builder.add_sort_field( From 90db60a1f175cc5e2ce90b22490f3cb7c6bcb41e Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Tue, 7 Jan 2025 21:53:01 +0100 Subject: [PATCH 11/36] chore: docstrings --- pyiceberg/table/update/sorting.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index a2ee561fe6..56ff6e6f76 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -43,6 +43,7 @@ def add_sort_field( direction: SortDirection, null_order: NullOrder, ) -> SortOrderBuilder: + """Add a sort field to the sort order list.""" self._fields.append( SortField( source_id=source_id, @@ -55,6 +56,7 @@ def add_sort_field( @property def sort_order(self) -> SortOrder: # todo: add sort order id? + """Return the sort order.""" return SortOrder(*self._fields) @@ -66,7 +68,7 @@ class ReplaceSortOrder(UpdateTableMetadata["ReplaceSortOrder"]): def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: super().__init__(transaction) - self._builder = SortOrderBuilder(case_sensitive) + self._builder = SortOrderBuilder(case_sensitive=case_sensitive) self._case_sensitive = case_sensitive self._last_sort_order_id = transaction.table_metadata.default_sort_order_id From 1cd27293a45b25c7a76db3e7c4115d30dbd31e0e Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 21:35:24 +0100 Subject: [PATCH 12/36] test: add integration test for replace sort order --- tests/integration/test_sort_order_update.py | 553 ++++++++++++++++++++ 1 file changed, 553 insertions(+) create mode 100644 tests/integration/test_sort_order_update.py diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py new file mode 100644 index 0000000000..98804b1c35 --- /dev/null +++ b/tests/integration/test_sort_order_update.py @@ -0,0 +1,553 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.transforms import ( + BucketTransform, + DayTransform, + HourTransform, + IdentityTransform, + MonthTransform, + TruncateTransform, + VoidTransform, + YearTransform, +) +from pyiceberg.types import ( + LongType, + NestedField, + StringType, + TimestampType, +) + + +def _simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table: + return _create_table_with_schema(catalog, table_schema_simple, "1") + + +def _table(catalog: Catalog) -> Table: + schema_with_timestamp = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "event_ts", TimestampType(), required=False), + NestedField(3, "str", StringType(), required=False), + ) + return _create_table_with_schema(catalog, schema_with_timestamp, "1") + + +def _table_v2(catalog: Catalog) -> Table: + schema_with_timestamp = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "event_ts", TimestampType(), required=False), + NestedField(3, "str", StringType(), required=False), + ) + return _create_table_with_schema(catalog, schema_with_timestamp, "2") + + +def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table: + tbl_name = "default.test_schema_evolution" + try: + catalog.drop_table(tbl_name) + except NoSuchTableError: + pass + return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) # pytest.lazy_fixture("session_catalog_hive"), +def test_sort_order_builder(catalog: Catalog, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple) + r = simple_table.replace_sort_order() + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_year(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", YearTransform(), "year_transform").commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, YearTransform(), "year_transform")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_year_generates_default_name(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", YearTransform()).commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, YearTransform(), "event_ts_year")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_month(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", MonthTransform(), "month_transform").commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, MonthTransform(), "month_transform")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_month_generates_default_name(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", MonthTransform()).commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, MonthTransform(), "event_ts_month")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_day(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", DayTransform(), "day_transform").commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, DayTransform(), "day_transform")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_day_generates_default_name(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", DayTransform()).commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, DayTransform(), "event_ts_day")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_hour(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", HourTransform(), "hour_transform").commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "hour_transform")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_hour_generates_default_name(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", HourTransform()).commit() +# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "event_ts_hour")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_bucket(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") +# simple_table.update_spec().add_field("foo", BucketTransform(12), "bucket_transform").commit() +# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, BucketTransform(12), "bucket_transform")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_bucket_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") +# simple_table.update_spec().add_field("foo", BucketTransform(12)).commit() +# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, BucketTransform(12), "foo_bucket_12")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_truncate(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") +# simple_table.update_spec().add_field("foo", TruncateTransform(1), "truncate_transform").commit() +# _validate_new_partition_fields( +# simple_table, 1000, 1, 1000, PartitionField(1, 1000, TruncateTransform(1), "truncate_transform") +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_truncate_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") +# simple_table.update_spec().add_field("foo", TruncateTransform(1)).commit() +# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, TruncateTransform(1), "foo_trunc_1")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_multiple_adds(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_identity("id").add_field("event_ts", HourTransform(), "hourly_partitioned").add_field( +# "str", TruncateTransform(2), "truncate_str" +# ).commit() +# _validate_new_partition_fields( +# table, +# 1002, +# 1, +# 1002, +# PartitionField(1, 1000, IdentityTransform(), "id"), +# PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), +# PartitionField(3, 1002, TruncateTransform(2), "truncate_str"), +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_void(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") +# simple_table.update_spec().add_field("foo", VoidTransform(), "void_transform").commit() +# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, VoidTransform(), "void_transform")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_void_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") +# simple_table.update_spec().add_field("foo", VoidTransform()).commit() +# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, VoidTransform(), "foo_null")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_hour_to_day(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("event_ts", DayTransform(), "daily_partitioned").commit() +# table.update_spec().add_field("event_ts", HourTransform(), "hourly_partitioned").commit() +# _validate_new_partition_fields( +# table, +# 1001, +# 2, +# 1001, +# PartitionField(2, 1000, DayTransform(), "daily_partitioned"), +# PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_add_multiple_buckets(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_field("id", BucketTransform(16)).add_field("id", BucketTransform(4)).commit() +# _validate_new_partition_fields( +# table, +# 1001, +# 1, +# 1001, +# PartitionField(1, 1000, BucketTransform(16), "id_bucket_16"), +# PartitionField(1, 1001, BucketTransform(4), "id_bucket_4"), +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_identity(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_identity("id").commit() +# table.update_spec().remove_field("id").commit() +# assert len(table.specs()) == 3 +# assert table.spec().spec_id == 2 +# assert table.spec() == PartitionSpec( +# PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="id"), spec_id=2 +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_identity_v2(catalog: Catalog) -> None: +# table_v2 = _table_v2(catalog) +# table_v2.update_spec().add_identity("id").commit() +# table_v2.update_spec().remove_field("id").commit() +# assert len(table_v2.specs()) == 2 +# assert table_v2.spec().spec_id == 0 +# assert table_v2.spec() == PartitionSpec(spec_id=0) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_and_add_identity(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_identity("id").commit() +# table.update_spec().remove_field("id").commit() +# table.update_spec().add_identity("id").commit() + +# assert len(table.specs()) == 4 +# assert table.spec().spec_id == 3 +# assert table.spec() == PartitionSpec( +# PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="id_1000"), +# PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="id"), +# spec_id=3, +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_and_add_identity_v2(catalog: Catalog) -> None: +# table_v2 = _table_v2(catalog) +# table_v2.update_spec().add_identity("id").commit() +# table_v2.update_spec().remove_field("id").commit() +# table_v2.update_spec().add_identity("id").commit() + +# assert len(table_v2.specs()) == 2 +# assert table_v2.spec().spec_id == 1 +# assert table_v2.spec() == PartitionSpec( +# PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1 +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_bucket(catalog: Catalog) -> None: +# table = _table(catalog) +# with table.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table.update_spec() as remove: +# remove.remove_field("bucketed_id") + +# assert len(table.specs()) == 3 +# _validate_new_partition_fields( +# table, +# 1001, +# 2, +# 1001, +# PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="bucketed_id"), +# PartitionField(source_id=2, field_id=1001, transform=DayTransform(), name="day_ts"), +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_bucket_v2(catalog: Catalog) -> None: +# table_v2 = _table_v2(catalog) +# with table_v2.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table_v2.update_spec() as remove: +# remove.remove_field("bucketed_id") +# assert len(table_v2.specs()) == 3 +# _validate_new_partition_fields( +# table_v2, 1001, 2, 1001, PartitionField(source_id=2, field_id=1001, transform=DayTransform(), name="day_ts") +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_day(catalog: Catalog) -> None: +# table = _table(catalog) +# with table.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table.update_spec() as remove: +# remove.remove_field("day_ts") + +# assert len(table.specs()) == 3 +# _validate_new_partition_fields( +# table, +# 1001, +# 2, +# 1001, +# PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id"), +# PartitionField(source_id=2, field_id=1001, transform=VoidTransform(), name="day_ts"), +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_remove_day_v2(catalog: Catalog) -> None: +# table_v2 = _table_v2(catalog) +# with table_v2.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table_v2.update_spec() as remove: +# remove.remove_field("day_ts") +# assert len(table_v2.specs()) == 3 +# _validate_new_partition_fields( +# table_v2, 1000, 2, 1001, PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id") +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_rename(catalog: Catalog) -> None: +# table = _table(catalog) +# table.update_spec().add_identity("id").commit() +# table.update_spec().rename_field("id", "sharded_id").commit() +# assert len(table.specs()) == 3 +# assert table.spec().spec_id == 2 +# _validate_new_partition_fields(table, 1000, 2, 1000, PartitionField(1, 1000, IdentityTransform(), "sharded_id")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_add_and_remove(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_identity("id").remove_field("id").commit() +# assert "Cannot delete newly added field id" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_add_redundant_time_partition(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_field("event_ts", YearTransform(), "year_transform").add_field( +# "event_ts", HourTransform(), "hour_transform" +# ).commit() +# assert "Cannot add time partition field: hour_transform conflicts with year_transform" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_delete_and_rename(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_identity("id").commit() +# table.update_spec().remove_field("id").rename_field("id", "sharded_id").commit() +# assert "Cannot delete and rename partition field id" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_rename_and_delete(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_identity("id").commit() +# table.update_spec().rename_field("id", "sharded_id").remove_field("id").commit() +# assert "Cannot rename and delete field id" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_add_same_tranform_for_same_field(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_field("str", TruncateTransform(4), "truncated_str").add_field( +# "str", TruncateTransform(4) +# ).commit() +# assert "Already added partition" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_add_same_field_multiple_times(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_field("id", IdentityTransform(), "duplicate").add_field( +# "id", IdentityTransform(), "duplicate" +# ).commit() +# assert "Already added partition" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_cannot_add_multiple_specs_same_name(catalog: Catalog) -> None: +# table = _table(catalog) +# with pytest.raises(ValueError) as exc_info: +# table.update_spec().add_field("id", IdentityTransform(), "duplicate").add_field( +# "event_ts", IdentityTransform(), "duplicate" +# ).commit() +# assert "Already added partition" in str(exc_info.value) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_change_specs_and_schema_transaction(catalog: Catalog) -> None: +# table = _table(catalog) +# with table.transaction() as transaction: +# with transaction.update_spec() as update_spec: +# update_spec.add_identity("id").add_field("event_ts", HourTransform(), "hourly_partitioned").add_field( +# "str", TruncateTransform(2), "truncate_str" +# ) + +# with transaction.update_schema() as update_schema: +# update_schema.add_column("col_string", StringType()) + +# _validate_new_partition_fields( +# table, +# 1002, +# 1, +# 1002, +# PartitionField(1, 1000, IdentityTransform(), "id"), +# PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), +# PartitionField(3, 1002, TruncateTransform(2), "truncate_str"), +# ) + +# assert table.schema() == Schema( +# NestedField(field_id=1, name="id", field_type=LongType(), required=False), +# NestedField(field_id=2, name="event_ts", field_type=TimestampType(), required=False), +# NestedField(field_id=3, name="str", field_type=StringType(), required=False), +# NestedField(field_id=4, name="col_string", field_type=StringType(), required=False), +# identifier_field_ids=[], +# ) +# assert table.schema().schema_id == 1 + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_multiple_adds_and_remove_v1(catalog: Catalog) -> None: +# table = _table(catalog) +# with table.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table.update_spec() as update: +# update.remove_field("day_ts").remove_field("bucketed_id") +# with table.update_spec() as update: +# update.add_field("str", TruncateTransform(2), "truncated_str") +# _validate_new_partition_fields( +# table, +# 1002, +# 3, +# 1002, +# PartitionField(1, 1000, VoidTransform(), "bucketed_id"), +# PartitionField(2, 1001, VoidTransform(), "day_ts"), +# PartitionField(3, 1002, TruncateTransform(2), "truncated_str"), +# ) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_multiple_adds_and_remove_v2(catalog: Catalog) -> None: +# table_v2 = _table_v2(catalog) +# with table_v2.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table_v2.update_spec() as update: +# update.remove_field("day_ts").remove_field("bucketed_id") +# with table_v2.update_spec() as update: +# update.add_field("str", TruncateTransform(2), "truncated_str") +# _validate_new_partition_fields(table_v2, 1002, 2, 1002, PartitionField(3, 1002, TruncateTransform(2), "truncated_str")) + + +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +# def test_multiple_remove_and_add_reuses_v2(catalog: Catalog) -> None: +# table_v2 = _table_v2(catalog) +# with table_v2.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# update.add_field("event_ts", DayTransform(), "day_ts") +# with table_v2.update_spec() as update: +# update.remove_field("day_ts").remove_field("bucketed_id") +# with table_v2.update_spec() as update: +# update.add_field("id", BucketTransform(16), "bucketed_id") +# _validate_new_partition_fields(table_v2, 1000, 2, 1001, PartitionField(1, 1000, BucketTransform(16), "bucketed_id")) + + +# def _validate_new_partition_fields( +# table: Table, +# expected_spec_last_assigned_field_id: int, +# expected_spec_id: int, +# expected_metadata_last_assigned_field_id: int, +# *expected_partition_fields: PartitionField, +# ) -> None: +# spec = table.spec() +# assert spec.spec_id == expected_spec_id +# assert spec.last_assigned_field_id == expected_spec_last_assigned_field_id +# assert table.last_partition_id() == expected_metadata_last_assigned_field_id +# assert len(spec.fields) == len(expected_partition_fields) +# for i in range(len(spec.fields)): +# assert spec.fields[i] == expected_partition_fields[i] From 0a1e781c2c642554bdaa926109bc3e6a1bded9f0 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 21:39:39 +0100 Subject: [PATCH 13/36] test: add test for lookup --- tests/integration/test_sort_order_update.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 98804b1c35..bdfd7e3b2e 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -72,6 +72,14 @@ def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) +def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple) + for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items(): + assert col_id == simple_table.replace_sort_order()._column_name_to_id(col_name) + + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) # pytest.lazy_fixture("session_catalog_hive"), def test_sort_order_builder(catalog: Catalog, table_schema_simple: Schema) -> None: From a550ccbc8bd3348ac40f30ecaab452ebcf91d27b Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 21:48:45 +0100 Subject: [PATCH 14/36] refactor: add last sort order id --- tests/integration/test_sort_order_update.py | 23 ++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index bdfd7e3b2e..1c8cc0a1fd 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -23,6 +23,8 @@ from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.table.update.sorting import SortOrderBuilder from pyiceberg.transforms import ( BucketTransform, DayTransform, @@ -72,6 +74,17 @@ def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) +@pytest.mark.integration +def test_sort_order_builder() -> None: + builder = SortOrderBuilder(last_sort_order_id=0) + builder.add_sort_field(1, IdentityTransform(), SortDirection.ASC, NullOrder.NULLS_FIRST) + builder.add_sort_field(2, IdentityTransform(), SortDirection.DESC, NullOrder.NULLS_LAST) + assert builder.sort_order == SortOrder( + SortField(1, IdentityTransform(), SortDirection.ASC, NullOrder.NULLS_FIRST), + SortField(2, IdentityTransform(), SortDirection.DESC, NullOrder.NULLS_LAST), + ) + + @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: @@ -80,11 +93,11 @@ def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> assert col_id == simple_table.replace_sort_order()._column_name_to_id(col_name) -@pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) # pytest.lazy_fixture("session_catalog_hive"), -def test_sort_order_builder(catalog: Catalog, table_schema_simple: Schema) -> None: - simple_table = _simple_table(catalog, table_schema_simple) - r = simple_table.replace_sort_order() +# @pytest.mark.integration +# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) # pytest.lazy_fixture("session_catalog_hive"), +# def test_sort_order_builder(catalog: Catalog, table_schema_simple: Schema) -> None: +# simple_table = _simple_table(catalog, table_schema_simple) +# r = simple_table.replace_sort_order() # @pytest.mark.integration From 8b0925589f180a1edaa51d652124ec2ee4ae461c Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 21:54:28 +0100 Subject: [PATCH 15/36] refactor: add last sort order id and increment --- pyiceberg/table/update/sorting.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 56ff6e6f76..ccfa3c5915 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -32,9 +32,10 @@ class SortOrderBuilder: - def __init__(self, case_sensitive: bool = True) -> None: + def __init__(self, last_sort_order_id: int, case_sensitive: bool = True) -> None: self._fields: List[SortField] = [] self._case_sensitive = case_sensitive + self._last_sort_order_id = last_sort_order_id def add_sort_field( self, @@ -57,7 +58,7 @@ def add_sort_field( @property def sort_order(self) -> SortOrder: # todo: add sort order id? """Return the sort order.""" - return SortOrder(*self._fields) + return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) class ReplaceSortOrder(UpdateTableMetadata["ReplaceSortOrder"]): @@ -68,9 +69,11 @@ class ReplaceSortOrder(UpdateTableMetadata["ReplaceSortOrder"]): def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: super().__init__(transaction) - self._builder = SortOrderBuilder(case_sensitive=case_sensitive) + self._builder = SortOrderBuilder( + case_sensitive=case_sensitive, + last_sort_order_id=transaction.table_metadata.default_sort_order_id, + ) self._case_sensitive = case_sensitive - self._last_sort_order_id = transaction.table_metadata.default_sort_order_id def _column_name_to_id(self, column_name: str) -> int: return ( From 67b9e527e52f6f42620ff048911ab18917952102 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 22:02:52 +0100 Subject: [PATCH 16/36] chore: add imports --- pyiceberg/table/update/sorting.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index ccfa3c5915..ec1f31dbf7 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -18,12 +18,14 @@ from typing import TYPE_CHECKING, Any, List, Tuple +from pyiceberg.table import AddSortOrderUpdate, SetDefaultSortOrderUpdate from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder from pyiceberg.table.update import ( TableRequirement, TableUpdate, UpdatesAndRequirements, UpdateTableMetadata, + AssertDefaultSortOrderId ) from pyiceberg.transforms import Transform @@ -56,7 +58,7 @@ def add_sort_field( return self @property - def sort_order(self) -> SortOrder: # todo: add sort order id? + def sort_order(self) -> SortOrder: """Return the sort order.""" return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) From dcaa63f05b56781cf12148c7ea679812e76cc2df Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 22:11:39 +0100 Subject: [PATCH 17/36] feat: add apply and commit methods --- pyiceberg/table/update/sorting.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index ec1f31dbf7..7bb0c67f74 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -105,9 +105,24 @@ def desc(self, source_column_name: str, transform: Transform[Any, Any], null_ord ) return self + def _apply(self) -> SortOrder: + return self._builder.sort_order + def _commit(self) -> UpdatesAndRequirements: """Apply the pending changes and commit.""" + new_sort_order = self._apply() requirements: Tuple[TableRequirement, ...] = () updates: Tuple[TableUpdate, ...] = () + if self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id: + updates = ( + AddSortOrderUpdate(sort_order=new_sort_order), + SetDefaultSortOrderUpdate(sort_order_id=-1) + ) + else: + updates = (SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),) + + required_last_assigned_sort_order_id = self._transaction.table_metadata.default_sort_order_id + requirements = (AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),) + return updates, requirements From ced6a4b3e8bd603e75eca830d8b61b6c5d6f93b9 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 22:19:16 +0100 Subject: [PATCH 18/36] test: remove spec stuff --- tests/integration/test_sort_order_update.py | 501 +------------------- 1 file changed, 22 insertions(+), 479 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 1c8cc0a1fd..39a223c054 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -93,482 +93,25 @@ def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> assert col_id == simple_table.replace_sort_order()._column_name_to_id(col_name) -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog")]) # pytest.lazy_fixture("session_catalog_hive"), -# def test_sort_order_builder(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _simple_table(catalog, table_schema_simple) -# r = simple_table.replace_sort_order() - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_year(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", YearTransform(), "year_transform").commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, YearTransform(), "year_transform")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_year_generates_default_name(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", YearTransform()).commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, YearTransform(), "event_ts_year")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_month(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", MonthTransform(), "month_transform").commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, MonthTransform(), "month_transform")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_month_generates_default_name(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", MonthTransform()).commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, MonthTransform(), "event_ts_month")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_day(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", DayTransform(), "day_transform").commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, DayTransform(), "day_transform")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_day_generates_default_name(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", DayTransform()).commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, DayTransform(), "event_ts_day")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_hour(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", HourTransform(), "hour_transform").commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "hour_transform")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_hour_generates_default_name(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", HourTransform()).commit() -# _validate_new_partition_fields(table, 1000, 1, 1000, PartitionField(2, 1000, HourTransform(), "event_ts_hour")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_bucket(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") -# simple_table.update_spec().add_field("foo", BucketTransform(12), "bucket_transform").commit() -# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, BucketTransform(12), "bucket_transform")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_bucket_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") -# simple_table.update_spec().add_field("foo", BucketTransform(12)).commit() -# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, BucketTransform(12), "foo_bucket_12")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_truncate(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") -# simple_table.update_spec().add_field("foo", TruncateTransform(1), "truncate_transform").commit() -# _validate_new_partition_fields( -# simple_table, 1000, 1, 1000, PartitionField(1, 1000, TruncateTransform(1), "truncate_transform") -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_truncate_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") -# simple_table.update_spec().add_field("foo", TruncateTransform(1)).commit() -# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, TruncateTransform(1), "foo_trunc_1")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_multiple_adds(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_identity("id").add_field("event_ts", HourTransform(), "hourly_partitioned").add_field( -# "str", TruncateTransform(2), "truncate_str" -# ).commit() -# _validate_new_partition_fields( -# table, -# 1002, -# 1, -# 1002, -# PartitionField(1, 1000, IdentityTransform(), "id"), -# PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), -# PartitionField(3, 1002, TruncateTransform(2), "truncate_str"), -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_void(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") -# simple_table.update_spec().add_field("foo", VoidTransform(), "void_transform").commit() -# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, VoidTransform(), "void_transform")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_void_generates_default_name(catalog: Catalog, table_schema_simple: Schema) -> None: -# simple_table = _create_table_with_schema(catalog, table_schema_simple, "1") -# simple_table.update_spec().add_field("foo", VoidTransform()).commit() -# _validate_new_partition_fields(simple_table, 1000, 1, 1000, PartitionField(1, 1000, VoidTransform(), "foo_null")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_hour_to_day(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("event_ts", DayTransform(), "daily_partitioned").commit() -# table.update_spec().add_field("event_ts", HourTransform(), "hourly_partitioned").commit() -# _validate_new_partition_fields( -# table, -# 1001, -# 2, -# 1001, -# PartitionField(2, 1000, DayTransform(), "daily_partitioned"), -# PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_add_multiple_buckets(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_field("id", BucketTransform(16)).add_field("id", BucketTransform(4)).commit() -# _validate_new_partition_fields( -# table, -# 1001, -# 1, -# 1001, -# PartitionField(1, 1000, BucketTransform(16), "id_bucket_16"), -# PartitionField(1, 1001, BucketTransform(4), "id_bucket_4"), -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_identity(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_identity("id").commit() -# table.update_spec().remove_field("id").commit() -# assert len(table.specs()) == 3 -# assert table.spec().spec_id == 2 -# assert table.spec() == PartitionSpec( -# PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="id"), spec_id=2 -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_identity_v2(catalog: Catalog) -> None: -# table_v2 = _table_v2(catalog) -# table_v2.update_spec().add_identity("id").commit() -# table_v2.update_spec().remove_field("id").commit() -# assert len(table_v2.specs()) == 2 -# assert table_v2.spec().spec_id == 0 -# assert table_v2.spec() == PartitionSpec(spec_id=0) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_and_add_identity(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_identity("id").commit() -# table.update_spec().remove_field("id").commit() -# table.update_spec().add_identity("id").commit() - -# assert len(table.specs()) == 4 -# assert table.spec().spec_id == 3 -# assert table.spec() == PartitionSpec( -# PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="id_1000"), -# PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name="id"), -# spec_id=3, -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_and_add_identity_v2(catalog: Catalog) -> None: -# table_v2 = _table_v2(catalog) -# table_v2.update_spec().add_identity("id").commit() -# table_v2.update_spec().remove_field("id").commit() -# table_v2.update_spec().add_identity("id").commit() - -# assert len(table_v2.specs()) == 2 -# assert table_v2.spec().spec_id == 1 -# assert table_v2.spec() == PartitionSpec( -# PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="id"), spec_id=1 -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_bucket(catalog: Catalog) -> None: -# table = _table(catalog) -# with table.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table.update_spec() as remove: -# remove.remove_field("bucketed_id") - -# assert len(table.specs()) == 3 -# _validate_new_partition_fields( -# table, -# 1001, -# 2, -# 1001, -# PartitionField(source_id=1, field_id=1000, transform=VoidTransform(), name="bucketed_id"), -# PartitionField(source_id=2, field_id=1001, transform=DayTransform(), name="day_ts"), -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_bucket_v2(catalog: Catalog) -> None: -# table_v2 = _table_v2(catalog) -# with table_v2.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table_v2.update_spec() as remove: -# remove.remove_field("bucketed_id") -# assert len(table_v2.specs()) == 3 -# _validate_new_partition_fields( -# table_v2, 1001, 2, 1001, PartitionField(source_id=2, field_id=1001, transform=DayTransform(), name="day_ts") -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_day(catalog: Catalog) -> None: -# table = _table(catalog) -# with table.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table.update_spec() as remove: -# remove.remove_field("day_ts") - -# assert len(table.specs()) == 3 -# _validate_new_partition_fields( -# table, -# 1001, -# 2, -# 1001, -# PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id"), -# PartitionField(source_id=2, field_id=1001, transform=VoidTransform(), name="day_ts"), -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_remove_day_v2(catalog: Catalog) -> None: -# table_v2 = _table_v2(catalog) -# with table_v2.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table_v2.update_spec() as remove: -# remove.remove_field("day_ts") -# assert len(table_v2.specs()) == 3 -# _validate_new_partition_fields( -# table_v2, 1000, 2, 1001, PartitionField(source_id=1, field_id=1000, transform=BucketTransform(16), name="bucketed_id") -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_rename(catalog: Catalog) -> None: -# table = _table(catalog) -# table.update_spec().add_identity("id").commit() -# table.update_spec().rename_field("id", "sharded_id").commit() -# assert len(table.specs()) == 3 -# assert table.spec().spec_id == 2 -# _validate_new_partition_fields(table, 1000, 2, 1000, PartitionField(1, 1000, IdentityTransform(), "sharded_id")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_add_and_remove(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_identity("id").remove_field("id").commit() -# assert "Cannot delete newly added field id" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_add_redundant_time_partition(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_field("event_ts", YearTransform(), "year_transform").add_field( -# "event_ts", HourTransform(), "hour_transform" -# ).commit() -# assert "Cannot add time partition field: hour_transform conflicts with year_transform" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_delete_and_rename(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_identity("id").commit() -# table.update_spec().remove_field("id").rename_field("id", "sharded_id").commit() -# assert "Cannot delete and rename partition field id" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_rename_and_delete(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_identity("id").commit() -# table.update_spec().rename_field("id", "sharded_id").remove_field("id").commit() -# assert "Cannot rename and delete field id" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_add_same_tranform_for_same_field(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_field("str", TruncateTransform(4), "truncated_str").add_field( -# "str", TruncateTransform(4) -# ).commit() -# assert "Already added partition" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_add_same_field_multiple_times(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_field("id", IdentityTransform(), "duplicate").add_field( -# "id", IdentityTransform(), "duplicate" -# ).commit() -# assert "Already added partition" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_cannot_add_multiple_specs_same_name(catalog: Catalog) -> None: -# table = _table(catalog) -# with pytest.raises(ValueError) as exc_info: -# table.update_spec().add_field("id", IdentityTransform(), "duplicate").add_field( -# "event_ts", IdentityTransform(), "duplicate" -# ).commit() -# assert "Already added partition" in str(exc_info.value) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_change_specs_and_schema_transaction(catalog: Catalog) -> None: -# table = _table(catalog) -# with table.transaction() as transaction: -# with transaction.update_spec() as update_spec: -# update_spec.add_identity("id").add_field("event_ts", HourTransform(), "hourly_partitioned").add_field( -# "str", TruncateTransform(2), "truncate_str" -# ) - -# with transaction.update_schema() as update_schema: -# update_schema.add_column("col_string", StringType()) - -# _validate_new_partition_fields( -# table, -# 1002, -# 1, -# 1002, -# PartitionField(1, 1000, IdentityTransform(), "id"), -# PartitionField(2, 1001, HourTransform(), "hourly_partitioned"), -# PartitionField(3, 1002, TruncateTransform(2), "truncate_str"), -# ) - -# assert table.schema() == Schema( -# NestedField(field_id=1, name="id", field_type=LongType(), required=False), -# NestedField(field_id=2, name="event_ts", field_type=TimestampType(), required=False), -# NestedField(field_id=3, name="str", field_type=StringType(), required=False), -# NestedField(field_id=4, name="col_string", field_type=StringType(), required=False), -# identifier_field_ids=[], -# ) -# assert table.schema().schema_id == 1 - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_multiple_adds_and_remove_v1(catalog: Catalog) -> None: -# table = _table(catalog) -# with table.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table.update_spec() as update: -# update.remove_field("day_ts").remove_field("bucketed_id") -# with table.update_spec() as update: -# update.add_field("str", TruncateTransform(2), "truncated_str") -# _validate_new_partition_fields( -# table, -# 1002, -# 3, -# 1002, -# PartitionField(1, 1000, VoidTransform(), "bucketed_id"), -# PartitionField(2, 1001, VoidTransform(), "day_ts"), -# PartitionField(3, 1002, TruncateTransform(2), "truncated_str"), -# ) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_multiple_adds_and_remove_v2(catalog: Catalog) -> None: -# table_v2 = _table_v2(catalog) -# with table_v2.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table_v2.update_spec() as update: -# update.remove_field("day_ts").remove_field("bucketed_id") -# with table_v2.update_spec() as update: -# update.add_field("str", TruncateTransform(2), "truncated_str") -# _validate_new_partition_fields(table_v2, 1002, 2, 1002, PartitionField(3, 1002, TruncateTransform(2), "truncated_str")) - - -# @pytest.mark.integration -# @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) -# def test_multiple_remove_and_add_reuses_v2(catalog: Catalog) -> None: -# table_v2 = _table_v2(catalog) -# with table_v2.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# update.add_field("event_ts", DayTransform(), "day_ts") -# with table_v2.update_spec() as update: -# update.remove_field("day_ts").remove_field("bucketed_id") -# with table_v2.update_spec() as update: -# update.add_field("id", BucketTransform(16), "bucketed_id") -# _validate_new_partition_fields(table_v2, 1000, 2, 1001, PartitionField(1, 1000, BucketTransform(16), "bucketed_id")) - - -# def _validate_new_partition_fields( -# table: Table, -# expected_spec_last_assigned_field_id: int, -# expected_spec_id: int, -# expected_metadata_last_assigned_field_id: int, -# *expected_partition_fields: PartitionField, -# ) -> None: -# spec = table.spec() -# assert spec.spec_id == expected_spec_id -# assert spec.last_assigned_field_id == expected_spec_last_assigned_field_id -# assert table.last_partition_id() == expected_metadata_last_assigned_field_id -# assert len(spec.fields) == len(expected_partition_fields) -# for i in range(len(spec.fields)): -# assert spec.fields[i] == expected_partition_fields[i] +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) +def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema): + simple_table = _simple_table(catalog, table_schema_simple) + simple_table.replace_sort_order().asc( + "foo", IdentityTransform(), NullOrder.NULLS_FIRST + ).desc("bar", IdentityTransform(), NullOrder.NULLS_LAST).commit() + assert simple_table.sort_order() == SortOrder( + SortField( + source_id=1, + transform=IdentityTransform(), + direction=SortDirection.ASC, + null_order=NullOrder.NULLS_FIRST + ), + SortField( + source_id=2, + transform=IdentityTransform(), + direction=SortDirection.DESC, + null_order=NullOrder.NULLS_LAST + ), + order_id=1 + ) From 43e09a3bafab4440c19cc4f44ad9ecdfc6b4198b Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Wed, 8 Jan 2025 22:19:57 +0100 Subject: [PATCH 19/36] chore: remove unused import --- tests/integration/test_sort_order_update.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 39a223c054..001f3e42c0 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -20,20 +20,12 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder from pyiceberg.table.update.sorting import SortOrderBuilder from pyiceberg.transforms import ( - BucketTransform, - DayTransform, - HourTransform, IdentityTransform, - MonthTransform, - TruncateTransform, - VoidTransform, - YearTransform, ) from pyiceberg.types import ( LongType, From b460c34df5774e11cf374555338393a4b8a6948b Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 10 Jan 2025 17:18:25 +0100 Subject: [PATCH 20/36] chore: add ReplaceSortOrder to the Transaction class --- pyiceberg/table/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 32df27da54..9f9efcea6b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -405,7 +405,17 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.table_metadata.name_mapping(), ) - def replace_sort_order(self) -> None: ... + def replace_sort_order(self, case_sensitive: bool = True) -> ReplaceSortOrder: + """Create a new ReplaceSortOrder to replace the sort order of this table. + + Returns: + A new ReplaceSortOrder. + """ + + return ReplaceSortOrder( + self, + case_sensitive=case_sensitive, + ) def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> UpdateSnapshot: """Create a new UpdateSnapshot to produce a new snapshot for the table. From 190071fb3cdd70708bd5990f74d0e6957b7ecf5e Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 10 Jan 2025 17:22:52 +0100 Subject: [PATCH 21/36] chore: lint --- pyiceberg/table/__init__.py | 8 ++++--- pyiceberg/table/update/sorting.py | 14 +++++------ tests/integration/test_sort_order_update.py | 26 +++++++-------------- 3 files changed, 19 insertions(+), 29 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 9f9efcea6b..ab2c7570f2 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -407,11 +407,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive def replace_sort_order(self, case_sensitive: bool = True) -> ReplaceSortOrder: """Create a new ReplaceSortOrder to replace the sort order of this table. - + + Args: + case_sensitive: If field names are case-sensitive. + Returns: - A new ReplaceSortOrder. + A new ReplaceSortOrder. """ - return ReplaceSortOrder( self, case_sensitive=case_sensitive, diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 7bb0c67f74..4179b3e1d9 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -18,14 +18,15 @@ from typing import TYPE_CHECKING, Any, List, Tuple -from pyiceberg.table import AddSortOrderUpdate, SetDefaultSortOrderUpdate from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder from pyiceberg.table.update import ( + AddSortOrderUpdate, + AssertDefaultSortOrderId, + SetDefaultSortOrderUpdate, TableRequirement, TableUpdate, UpdatesAndRequirements, UpdateTableMetadata, - AssertDefaultSortOrderId ) from pyiceberg.transforms import Transform @@ -34,7 +35,7 @@ class SortOrderBuilder: - def __init__(self, last_sort_order_id: int, case_sensitive: bool = True) -> None: + def __init__(self, last_sort_order_id: int, case_sensitive: bool = True) -> None: self._fields: List[SortField] = [] self._case_sensitive = case_sensitive self._last_sort_order_id = last_sort_order_id @@ -115,13 +116,10 @@ def _commit(self) -> UpdatesAndRequirements: updates: Tuple[TableUpdate, ...] = () if self._transaction.table_metadata.default_sort_order_id != new_sort_order.order_id: - updates = ( - AddSortOrderUpdate(sort_order=new_sort_order), - SetDefaultSortOrderUpdate(sort_order_id=-1) - ) + updates = (AddSortOrderUpdate(sort_order=new_sort_order), SetDefaultSortOrderUpdate(sort_order_id=-1)) else: updates = (SetDefaultSortOrderUpdate(sort_order_id=new_sort_order.order_id),) - + required_last_assigned_sort_order_id = self._transaction.table_metadata.default_sort_order_id requirements = (AssertDefaultSortOrderId(default_sort_order_id=required_last_assigned_sort_order_id),) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 001f3e42c0..4db7f89c46 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -78,7 +78,7 @@ def test_sort_order_builder() -> None: @pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple) for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items(): @@ -87,23 +87,13 @@ def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) -def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema): +def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple) - simple_table.replace_sort_order().asc( - "foo", IdentityTransform(), NullOrder.NULLS_FIRST - ).desc("bar", IdentityTransform(), NullOrder.NULLS_LAST).commit() + simple_table.replace_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc( + "bar", IdentityTransform(), NullOrder.NULLS_LAST + ).commit() assert simple_table.sort_order() == SortOrder( - SortField( - source_id=1, - transform=IdentityTransform(), - direction=SortDirection.ASC, - null_order=NullOrder.NULLS_FIRST - ), - SortField( - source_id=2, - transform=IdentityTransform(), - direction=SortDirection.DESC, - null_order=NullOrder.NULLS_LAST - ), - order_id=1 + SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), + SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST), + order_id=1, ) From ec5f711591241aaec0a3f80d7a76c9b1b25edac2 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 24 Jan 2025 10:16:17 +0100 Subject: [PATCH 22/36] chore: renames (replace to update) --- pyiceberg/table/__init__.py | 14 +++++++------- pyiceberg/table/update/sorting.py | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f9d214b72d..effa1c06e6 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -117,7 +117,7 @@ UpdateSnapshot, _FastAppendFiles, ) -from pyiceberg.table.update.sorting import ReplaceSortOrder +from pyiceberg.table.update.sorting import UpdateSortOrder from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -414,16 +414,16 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.table_metadata.name_mapping(), ) - def replace_sort_order(self, case_sensitive: bool = True) -> ReplaceSortOrder: - """Create a new ReplaceSortOrder to replace the sort order of this table. + def update_sort_order(self, case_sensitive: bool = True) -> UpdateSortOrder: + """Create a new UpdateSortOrder to update the sort order of this table. Args: case_sensitive: If field names are case-sensitive. Returns: - A new ReplaceSortOrder. + A new UpdateSortOrder. """ - return ReplaceSortOrder( + return UpdateSortOrder( self, case_sensitive=case_sensitive, ) @@ -1095,13 +1095,13 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.name_mapping(), ) - def replace_sort_order(self) -> ReplaceSortOrder: + def update_sort_order(self) -> UpdateSortOrder: """Create a new ReplaceSortOrder to replace the sort order of this table. Returns: A new ReplaceSortOrder. """ - return ReplaceSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=True) + return UpdateSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=True) def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 4179b3e1d9..0e8641a715 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -64,7 +64,7 @@ def sort_order(self) -> SortOrder: return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) -class ReplaceSortOrder(UpdateTableMetadata["ReplaceSortOrder"]): +class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): _transaction: Transaction _builder: SortOrderBuilder _last_assigned_order_id: int @@ -88,7 +88,7 @@ def _column_name_to_id(self, column_name: str) -> int: .field_id ) - def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> ReplaceSortOrder: + def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: self._builder.add_sort_field( source_id=self._column_name_to_id(source_column_name), transform=transform, @@ -97,7 +97,7 @@ def asc(self, source_column_name: str, transform: Transform[Any, Any], null_orde ) return self - def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> ReplaceSortOrder: + def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: self._builder.add_sort_field( source_id=self._column_name_to_id(source_column_name), transform=transform, From d69a07171b9608a787c4aece355577b688c1f26f Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 24 Jan 2025 10:16:58 +0100 Subject: [PATCH 23/36] chore: renames (replace to update) --- tests/integration/test_sort_order_update.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 4db7f89c46..fe4e550c10 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -82,14 +82,14 @@ def test_sort_order_builder() -> None: def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple) for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items(): - assert col_id == simple_table.replace_sort_order()._column_name_to_id(col_name) + assert col_id == simple_table.update_sort_order()._column_name_to_id(col_name) @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple) - simple_table.replace_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc( + simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc( "bar", IdentityTransform(), NullOrder.NULLS_LAST ).commit() assert simple_table.sort_order() == SortOrder( From b5a5bd83903df232d60ee34619db0f5073cfe825 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 24 Jan 2025 10:27:47 +0100 Subject: [PATCH 24/36] test: add test updating sort order --- tests/integration/test_sort_order_update.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index fe4e550c10..c41edd5966 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -97,3 +97,23 @@ def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema) -> No SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST), order_id=1, ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) +def test_replace_existing_sort_order(catalog: Catalog, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple) + simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit() + assert simple_table.sort_order() == SortOrder( + SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), + order_id=1, + ) + simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_LAST).desc( + "bar", IdentityTransform(), NullOrder.NULLS_FIRST + ).commit() + assert len(simple_table.sort_orders()) == 3 # 0: empty sort order from creating tables, 1: first sort order, 2: second sort order + assert simple_table.sort_order() == SortOrder( + SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST), + SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_FIRST), + order_id=2, + ) From 8080fa526f25a240548312ec4e44518cf8798f10 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 24 Jan 2025 10:36:01 +0100 Subject: [PATCH 25/36] refactor: remove the sort order builder --- pyiceberg/table/update/sorting.py | 71 +++++++++++++------------------ 1 file changed, 30 insertions(+), 41 deletions(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 0e8641a715..87c2cd9098 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -34,51 +34,21 @@ from pyiceberg.table import Transaction -class SortOrderBuilder: - def __init__(self, last_sort_order_id: int, case_sensitive: bool = True) -> None: - self._fields: List[SortField] = [] - self._case_sensitive = case_sensitive - self._last_sort_order_id = last_sort_order_id - - def add_sort_field( - self, - source_id: int, - transform: Transform[Any, Any], - direction: SortDirection, - null_order: NullOrder, - ) -> SortOrderBuilder: - """Add a sort field to the sort order list.""" - self._fields.append( - SortField( - source_id=source_id, - transform=transform, - direction=direction, - null_order=null_order, - ) - ) - return self - - @property - def sort_order(self) -> SortOrder: - """Return the sort order.""" - return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) - - class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): _transaction: Transaction - _builder: SortOrderBuilder _last_assigned_order_id: int _case_sensitive: bool + _fields: List[SortField] + _last_sort_order_id: int def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: super().__init__(transaction) - self._builder = SortOrderBuilder( - case_sensitive=case_sensitive, - last_sort_order_id=transaction.table_metadata.default_sort_order_id, - ) - self._case_sensitive = case_sensitive + self._fields: List[SortField] = [] + self._case_sensitive: bool = case_sensitive + self._last_sort_order_id: int = transaction.table_metadata.default_sort_order_id def _column_name_to_id(self, column_name: str) -> int: + """Maps the column name to the column field id.""" return ( self._transaction.table_metadata.schema() .find_field( @@ -87,27 +57,46 @@ def _column_name_to_id(self, column_name: str) -> int: ) .field_id ) + + def _add_sort_field( + self, + source_id: int, + transform: Transform[Any, Any], + direction: SortDirection, + null_order: NullOrder, + ) -> UpdateSortOrder: + """Add a sort field to the sort order list.""" + self._fields.append( + SortField( + source_id=source_id, + transform=transform, + direction=direction, + null_order=null_order, + ) + ) + return self def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: - self._builder.add_sort_field( + """Adds a sort field with ascending order.""" + return self._add_sort_field( source_id=self._column_name_to_id(source_column_name), transform=transform, direction=SortDirection.ASC, null_order=null_order, ) - return self def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: - self._builder.add_sort_field( + """Adds a sort field with descending order.""" + return self._add_sort_field( source_id=self._column_name_to_id(source_column_name), transform=transform, direction=SortDirection.DESC, null_order=null_order, ) - return self def _apply(self) -> SortOrder: - return self._builder.sort_order + """Returns the sort order""" + return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) def _commit(self) -> UpdatesAndRequirements: """Apply the pending changes and commit.""" From e77a2c1b113e915fd14bf37d076f177875609666 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 24 Jan 2025 10:36:34 +0100 Subject: [PATCH 26/36] chore: remove sort order builder --- tests/integration/test_sort_order_update.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index c41edd5966..1272958711 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -23,7 +23,6 @@ from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder -from pyiceberg.table.update.sorting import SortOrderBuilder from pyiceberg.transforms import ( IdentityTransform, ) @@ -66,17 +65,6 @@ def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) -@pytest.mark.integration -def test_sort_order_builder() -> None: - builder = SortOrderBuilder(last_sort_order_id=0) - builder.add_sort_field(1, IdentityTransform(), SortDirection.ASC, NullOrder.NULLS_FIRST) - builder.add_sort_field(2, IdentityTransform(), SortDirection.DESC, NullOrder.NULLS_LAST) - assert builder.sort_order == SortOrder( - SortField(1, IdentityTransform(), SortDirection.ASC, NullOrder.NULLS_FIRST), - SortField(2, IdentityTransform(), SortDirection.DESC, NullOrder.NULLS_LAST), - ) - - @pytest.mark.integration @pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: From fc32b2832c997de0dbbdb84d6225332fbef1186b Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Fri, 24 Jan 2025 10:39:50 +0100 Subject: [PATCH 27/36] chore: lint --- pyiceberg/table/update/sorting.py | 10 +++++----- tests/integration/test_sort_order_update.py | 4 +++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 87c2cd9098..19d1b4affa 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -48,7 +48,7 @@ def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> Non self._last_sort_order_id: int = transaction.table_metadata.default_sort_order_id def _column_name_to_id(self, column_name: str) -> int: - """Maps the column name to the column field id.""" + """Map the column name to the column field id.""" return ( self._transaction.table_metadata.schema() .find_field( @@ -57,7 +57,7 @@ def _column_name_to_id(self, column_name: str) -> int: ) .field_id ) - + def _add_sort_field( self, source_id: int, @@ -77,7 +77,7 @@ def _add_sort_field( return self def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: - """Adds a sort field with ascending order.""" + """Add a sort field with ascending order.""" return self._add_sort_field( source_id=self._column_name_to_id(source_column_name), transform=transform, @@ -86,7 +86,7 @@ def asc(self, source_column_name: str, transform: Transform[Any, Any], null_orde ) def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: - """Adds a sort field with descending order.""" + """Add a sort field with descending order.""" return self._add_sort_field( source_id=self._column_name_to_id(source_column_name), transform=transform, @@ -95,7 +95,7 @@ def desc(self, source_column_name: str, transform: Transform[Any, Any], null_ord ) def _apply(self) -> SortOrder: - """Returns the sort order""" + """Return the sort order.""" return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) def _commit(self) -> UpdatesAndRequirements: diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 1272958711..857d5b3841 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -99,7 +99,9 @@ def test_replace_existing_sort_order(catalog: Catalog, table_schema_simple: Sche simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_LAST).desc( "bar", IdentityTransform(), NullOrder.NULLS_FIRST ).commit() - assert len(simple_table.sort_orders()) == 3 # 0: empty sort order from creating tables, 1: first sort order, 2: second sort order + assert ( + len(simple_table.sort_orders()) == 3 + ) # 0: empty sort order from creating tables, 1: first sort order, 2: second sort order assert simple_table.sort_order() == SortOrder( SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_LAST), SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_FIRST), From fa1aa50c5c7c54976c3993b1c8c643700530fc7c Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sat, 25 Jan 2025 10:40:29 +0100 Subject: [PATCH 28/36] chore: update comment --- pyiceberg/table/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index effa1c06e6..ce3c8d72eb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1096,10 +1096,10 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive ) def update_sort_order(self) -> UpdateSortOrder: - """Create a new ReplaceSortOrder to replace the sort order of this table. + """Create a new UpdateSortOrder to update the sort order of this table. Returns: - A new ReplaceSortOrder. + A new UpdateSortOrder. """ return UpdateSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=True) From 2e9cd3f1191175caf391c0a025aaa7d77f51fa15 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:17:07 +0100 Subject: [PATCH 29/36] test: parametrize over iceberg format versions and remove unnused code --- tests/integration/test_sort_order_update.py | 70 ++++++++++----------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 857d5b3841..0dbab8c9d3 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -26,34 +26,10 @@ from pyiceberg.transforms import ( IdentityTransform, ) -from pyiceberg.types import ( - LongType, - NestedField, - StringType, - TimestampType, -) - - -def _simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table: - return _create_table_with_schema(catalog, table_schema_simple, "1") - - -def _table(catalog: Catalog) -> Table: - schema_with_timestamp = Schema( - NestedField(1, "id", LongType(), required=False), - NestedField(2, "event_ts", TimestampType(), required=False), - NestedField(3, "str", StringType(), required=False), - ) - return _create_table_with_schema(catalog, schema_with_timestamp, "1") -def _table_v2(catalog: Catalog) -> Table: - schema_with_timestamp = Schema( - NestedField(1, "id", LongType(), required=False), - NestedField(2, "event_ts", TimestampType(), required=False), - NestedField(3, "str", StringType(), required=False), - ) - return _create_table_with_schema(catalog, schema_with_timestamp, "2") +def _simple_table(catalog: Catalog, table_schema_simple: Schema, format_version: str) -> Table: + return _create_table_with_schema(catalog, table_schema_simple, format_version) def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table: @@ -66,17 +42,33 @@ def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: @pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) -def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: - simple_table = _simple_table(catalog, table_schema_simple) +@pytest.mark.parametrize( + "catalog, format_version", + [ + (pytest.lazy_fixture("session_catalog"), "1"), + (pytest.lazy_fixture("session_catalog_hive"), "1"), + (pytest.lazy_fixture("session_catalog"), "2"), + (pytest.lazy_fixture("session_catalog_hive"), "2"), + ] +) +def test_map_column_name_to_id(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple, format_version) for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items(): assert col_id == simple_table.update_sort_order()._column_name_to_id(col_name) @pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) -def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema) -> None: - simple_table = _simple_table(catalog, table_schema_simple) +@pytest.mark.parametrize( + "catalog, format_version", + [ + (pytest.lazy_fixture("session_catalog"), "1"), + (pytest.lazy_fixture("session_catalog_hive"), "1"), + (pytest.lazy_fixture("session_catalog"), "2"), + (pytest.lazy_fixture("session_catalog_hive"), "2"), + ] +) +def test_replace_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple, format_version) simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc( "bar", IdentityTransform(), NullOrder.NULLS_LAST ).commit() @@ -88,9 +80,17 @@ def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema) -> No @pytest.mark.integration -@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) -def test_replace_existing_sort_order(catalog: Catalog, table_schema_simple: Schema) -> None: - simple_table = _simple_table(catalog, table_schema_simple) +@pytest.mark.parametrize( + "catalog, format_version", + [ + (pytest.lazy_fixture("session_catalog"), "1"), + (pytest.lazy_fixture("session_catalog_hive"), "1"), + (pytest.lazy_fixture("session_catalog"), "2"), + (pytest.lazy_fixture("session_catalog_hive"), "2"), + ] +) +def test_replace_existing_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple, format_version) simple_table.update_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).commit() assert simple_table.sort_order() == SortOrder( SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), From 137dbd9551b2172ef5c2d146d4a3d8de5dc9724f Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:17:57 +0100 Subject: [PATCH 30/36] chore: fmt --- tests/integration/test_sort_order_update.py | 24 ++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_sort_order_update.py b/tests/integration/test_sort_order_update.py index 0dbab8c9d3..bfac783e9e 100644 --- a/tests/integration/test_sort_order_update.py +++ b/tests/integration/test_sort_order_update.py @@ -43,13 +43,13 @@ def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: @pytest.mark.integration @pytest.mark.parametrize( - "catalog, format_version", + "catalog, format_version", [ - (pytest.lazy_fixture("session_catalog"), "1"), + (pytest.lazy_fixture("session_catalog"), "1"), (pytest.lazy_fixture("session_catalog_hive"), "1"), - (pytest.lazy_fixture("session_catalog"), "2"), + (pytest.lazy_fixture("session_catalog"), "2"), (pytest.lazy_fixture("session_catalog_hive"), "2"), - ] + ], ) def test_map_column_name_to_id(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple, format_version) @@ -59,13 +59,13 @@ def test_map_column_name_to_id(catalog: Catalog, format_version: str, table_sche @pytest.mark.integration @pytest.mark.parametrize( - "catalog, format_version", + "catalog, format_version", [ - (pytest.lazy_fixture("session_catalog"), "1"), + (pytest.lazy_fixture("session_catalog"), "1"), (pytest.lazy_fixture("session_catalog_hive"), "1"), - (pytest.lazy_fixture("session_catalog"), "2"), + (pytest.lazy_fixture("session_catalog"), "2"), (pytest.lazy_fixture("session_catalog_hive"), "2"), - ] + ], ) def test_replace_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple, format_version) @@ -81,13 +81,13 @@ def test_replace_sort_order(catalog: Catalog, format_version: str, table_schema_ @pytest.mark.integration @pytest.mark.parametrize( - "catalog, format_version", + "catalog, format_version", [ - (pytest.lazy_fixture("session_catalog"), "1"), + (pytest.lazy_fixture("session_catalog"), "1"), (pytest.lazy_fixture("session_catalog_hive"), "1"), - (pytest.lazy_fixture("session_catalog"), "2"), + (pytest.lazy_fixture("session_catalog"), "2"), (pytest.lazy_fixture("session_catalog_hive"), "2"), - ] + ], ) def test_replace_existing_sort_order(catalog: Catalog, format_version: str, table_schema_simple: Schema) -> None: simple_table = _simple_table(catalog, table_schema_simple, format_version) From d8b90012d3fcbde0b68b9f9e208f5730a76dd159 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:19:56 +0100 Subject: [PATCH 31/36] Update pyiceberg/table/update/sorting.py fix: set default Co-authored-by: Fokko Driesprong --- pyiceberg/table/update/sorting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 19d1b4affa..43256b35fc 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -85,7 +85,7 @@ def asc(self, source_column_name: str, transform: Transform[Any, Any], null_orde null_order=null_order, ) - def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: + def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST) -> UpdateSortOrder: """Add a sort field with descending order.""" return self._add_sort_field( source_id=self._column_name_to_id(source_column_name), From 58d302de58e75da4a7420e128831521b8b0ec772 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:24:16 +0100 Subject: [PATCH 32/36] Update pyiceberg/table/__init__.py chore: update signature Co-authored-by: Fokko Driesprong --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c195b1e684..8f9d76ed29 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1123,7 +1123,7 @@ def update_sort_order(self) -> UpdateSortOrder: Returns: A new UpdateSortOrder. """ - return UpdateSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=True) + return UpdateSortOrder(transaction=Transaction(self, autocommit=True), case_sensitive=case_sensitive) def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" From fd0e287331097d8a82e71bb884ca46ab5e9154f3 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:25:41 +0100 Subject: [PATCH 33/36] chore: add arg --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8f9d76ed29..59abcf3a43 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1117,7 +1117,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive name_mapping=self.name_mapping(), ) - def update_sort_order(self) -> UpdateSortOrder: + def update_sort_order(self, case_sensitive: bool = True) -> UpdateSortOrder: """Create a new UpdateSortOrder to update the sort order of this table. Returns: From 5e57697a9eafa5d76800d5b9f9cddb9a011a1d84 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:26:20 +0100 Subject: [PATCH 34/36] chore: fmt --- pyiceberg/table/update/sorting.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 43256b35fc..05657c84e4 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -85,7 +85,9 @@ def asc(self, source_column_name: str, transform: Transform[Any, Any], null_orde null_order=null_order, ) - def desc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST) -> UpdateSortOrder: + def desc( + self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST + ) -> UpdateSortOrder: """Add a sort field with descending order.""" return self._add_sort_field( source_id=self._column_name_to_id(source_column_name), From 51319032cb3d4ed2fc6a1a6525caed645ed5b9a3 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:47:23 +0100 Subject: [PATCH 35/36] docs: update docs --- mkdocs/docs/api.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index e904662871..e72f9e0a78 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1194,6 +1194,24 @@ with table.update_spec() as update: update.rename_field("bucketed_id", "sharded_id") ``` +## Sort order updates + +Users can update the sort order on existing tables for new data. See [sorting](https://iceberg.apache.org/spec/#sorting) for more details. + +The API to use when updating a sort order is the `update_sort_order` API on the table. + +Sort orders can only be updated by adding a new sort order. They cannot be deleted or modified. + +### Updating a sort order on a table + +To create a new sort order, you can use either the `asc` or `desc` API depending on whether you want you data sorted in ascending or descending order. Both take the name of the field, the sort order transform, and a null order that describes the order of null values when sorted. + +```python +with table.update_sort_order() as update: + update.desc("event_ts", DayTransform(), NullOrder.NULLS_FIRST) + update.asc("some_field", IdentityTransform(), NullOrder.NULLS_LAST) +``` + ## Table properties Set and remove properties through the `Transaction` API: From cc1ae1c2e17f96f4ef62a58fb491cca1ab3202b6 Mon Sep 17 00:00:00 2001 From: Jasper Ginn Date: Sun, 16 Feb 2025 15:47:43 +0100 Subject: [PATCH 36/36] chore: set default --- pyiceberg/table/update/sorting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/update/sorting.py b/pyiceberg/table/update/sorting.py index 05657c84e4..4df17d700c 100644 --- a/pyiceberg/table/update/sorting.py +++ b/pyiceberg/table/update/sorting.py @@ -76,7 +76,7 @@ def _add_sort_field( ) return self - def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder) -> UpdateSortOrder: + def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST) -> UpdateSortOrder: """Add a sort field with ascending order.""" return self._add_sort_field( source_id=self._column_name_to_id(source_column_name),