From 37c954d5d925cdf35097d8df2861464f31592508 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 12 Mar 2025 15:27:17 +0800 Subject: [PATCH 1/6] feat: add project transaction operation for pylance sdk --- python/python/lance/dataset.py | 46 ++++++++++++++++++++++ python/python/tests/test_dataset.py | 61 +++++++++++++++++++++++++++++ python/src/transaction.rs | 6 +++ 3 files changed, 113 insertions(+) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 7dd4c282fb..8526df94ec 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2957,6 +2957,52 @@ class DataReplacement(BaseOperation): replacements: List[LanceOperation.DataReplacementGroup] + @dataclass + class Project(BaseOperation): + """ + Operation that project columns. Use this operator for drop column or rename/swap column. + + Attributes + ---------- + schema: LanceSchema or pyarrow.Schema + The schema of the new dataset. Passing a LanceSchema is preferred, + and passing a pyarrow.Schema is deprecated. + + Examples + -------- + Use the projece operator to swap column: + + >>> import lance + >>> import pyarrow as pa + >>> import pyarrow.compute as pc + >>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]}) + >>> dataset = lance.write_dataset(table, "example") + >>> dataset.to_table().to_pandas() + a b b1 + 0 1 a c + 1 2 b d + >>> + >>> ## rename column `b` into `b0` and rename b1 into `b` + >>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]}) + >>> operation = lance.LanceOperation.Project(table.schema) + >>> + >>> dataset = lance.LanceDataset.commit("example", operation, read_version=1) + >>> dataset.to_table().to_pandas() + a b0 b + 0 1 a c + 1 2 b d + """ + + schema: LanceSchema | pa.Schema + + def __post_init__(self): + if isinstance(self.schema, pa.Schema): + warnings.warn( + "Passing a pyarrow.Schema to Project is deprecated. " + "Please use a LanceSchema instead.", + DeprecationWarning, + ) + self.schema = LanceSchema.from_pyarrow(self.schema) class ScannerBuilder: def __init__(self, ds: LanceDataset): diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index f0adc2acda..9d3ee78de6 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2984,6 +2984,67 @@ def test_data_replacement(tmp_path: Path): ) assert tbl == expected +def test_schema_project_drop_column(tmp_path: Path): + table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)}) + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + schema = pa.Table.from_pydict({"a": range(1)}).schema + + project = lance.LanceOperation.Project(schema) + dataset = lance.LanceDataset.commit(dataset, project, read_version=1) + + tbl = dataset.to_table() + + expected = pa.Table.from_pydict( + { + "a": list(range(100, 200)), + } + ) + assert tbl == expected + +def test_schema_project_rename_column(tmp_path: Path): + table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)}) + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + schema = pa.Table.from_pydict({"c": range(1), "d": range(1)}).schema + + project = lance.LanceOperation.Project(schema) + dataset = lance.LanceDataset.commit(dataset, project, read_version=1) + + tbl = dataset.to_table() + + expected = pa.Table.from_pydict( + { + "c": list(range(100, 200)), + "d": list(range(300, 400)), + } + ) + assert tbl == expected + +def test_schema_project_swap_column(tmp_path: Path): + table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)}) + base_dir = tmp_path / "test" + + dataset = lance.write_dataset(table, base_dir) + + schema = pa.Table.from_pydict({"b": range(1), "a": range(1)}).schema + + project = lance.LanceOperation.Project(schema) + dataset = lance.LanceDataset.commit(dataset, project, read_version=1) + + tbl = dataset.to_table() + + expected = pa.Table.from_pydict( + { + "b": list(range(100, 200)), + "a": list(range(300, 400)), + } + ) + assert tbl == expected def test_empty_structs(tmp_path): schema = pa.schema([pa.field("id", pa.int32()), pa.field("empties", pa.struct([]))]) diff --git a/python/src/transaction.rs b/python/src/transaction.rs index 33ed60eaa5..b09087bd06 100644 --- a/python/src/transaction.rs +++ b/python/src/transaction.rs @@ -157,6 +157,12 @@ impl FromPyObject<'_> for PyLance { Ok(Self(op)) } + "Project" => { + let schema = extract_schema(&ob.getattr("schema")?)?; + + let op = Operation::Project { schema }; + Ok(Self(op)) + } unsupported => Err(PyValueError::new_err(format!( "Unsupported operation: {unsupported}", ))), From 506f930578837e744d3af610ed544f6ba96adb51 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 12 Mar 2025 15:32:23 +0800 Subject: [PATCH 2/6] fix python format --- python/python/lance/dataset.py | 1 + python/python/tests/test_dataset.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 8526df94ec..51ff9fc1a8 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3004,6 +3004,7 @@ def __post_init__(self): ) self.schema = LanceSchema.from_pyarrow(self.schema) + class ScannerBuilder: def __init__(self, ds: LanceDataset): self.ds = ds diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9d3ee78de6..e4e2158ff3 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -2984,6 +2984,7 @@ def test_data_replacement(tmp_path: Path): ) assert tbl == expected + def test_schema_project_drop_column(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)}) base_dir = tmp_path / "test" @@ -3004,6 +3005,7 @@ def test_schema_project_drop_column(tmp_path: Path): ) assert tbl == expected + def test_schema_project_rename_column(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)}) base_dir = tmp_path / "test" @@ -3025,6 +3027,7 @@ def test_schema_project_rename_column(tmp_path: Path): ) assert tbl == expected + def test_schema_project_swap_column(tmp_path: Path): table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)}) base_dir = tmp_path / "test" @@ -3046,6 +3049,7 @@ def test_schema_project_swap_column(tmp_path: Path): ) assert tbl == expected + def test_empty_structs(tmp_path): schema = pa.schema([pa.field("id", pa.int32()), pa.field("empties", pa.struct([]))]) table = pa.table({"id": [0, 1, 2], "empties": [{}] * 3}, schema=schema) From 0e349f263759e82d68d0d907093621ee3c6df99d Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Wed, 12 Mar 2025 15:45:39 +0800 Subject: [PATCH 3/6] fix python format --- python/python/lance/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 51ff9fc1a8..9179f355db 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2960,7 +2960,8 @@ class DataReplacement(BaseOperation): @dataclass class Project(BaseOperation): """ - Operation that project columns. Use this operator for drop column or rename/swap column. + Operation that project columns. + Use this operator for drop column or rename/swap column. Attributes ---------- From bad98a055dabafeb489e882a5163b92fb994c71a Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Thu, 13 Mar 2025 10:25:22 +0800 Subject: [PATCH 4/6] fix comment --- python/python/lance/dataset.py | 17 ++++------------- python/python/tests/test_dataset.py | 10 +++++++--- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 9179f355db..5a583734d1 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2976,6 +2976,7 @@ class Project(BaseOperation): >>> import lance >>> import pyarrow as pa >>> import pyarrow.compute as pc + >>> from lance.schema import LanceSchema >>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]}) >>> dataset = lance.write_dataset(table, "example") >>> dataset.to_table().to_pandas() @@ -2985,8 +2986,8 @@ class Project(BaseOperation): >>> >>> ## rename column `b` into `b0` and rename b1 into `b` >>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]}) - >>> operation = lance.LanceOperation.Project(table.schema) - >>> + >>> lance_schema = LanceSchema.from_pyarrow(table.schema) + >>> operation = lance.LanceOperation.Project(lance_schema) >>> dataset = lance.LanceDataset.commit("example", operation, read_version=1) >>> dataset.to_table().to_pandas() a b0 b @@ -2994,17 +2995,7 @@ class Project(BaseOperation): 1 2 b d """ - schema: LanceSchema | pa.Schema - - def __post_init__(self): - if isinstance(self.schema, pa.Schema): - warnings.warn( - "Passing a pyarrow.Schema to Project is deprecated. " - "Please use a LanceSchema instead.", - DeprecationWarning, - ) - self.schema = LanceSchema.from_pyarrow(self.schema) - + schema: LanceSchema class ScannerBuilder: def __init__(self, ds: LanceDataset): diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index e4e2158ff3..19142204a5 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -30,6 +30,7 @@ from lance._dataset.sharded_batch_iterator import ShardedBatchIterator from lance.commit import CommitConflictError from lance.debug import format_fragment +from lance.schema import LanceSchema from lance.util import validate_vector_index # Various valid inputs for write_dataset @@ -2992,8 +2993,9 @@ def test_schema_project_drop_column(tmp_path: Path): dataset = lance.write_dataset(table, base_dir) schema = pa.Table.from_pydict({"a": range(1)}).schema + lance_schema = LanceSchema.from_pyarrow(schema) - project = lance.LanceOperation.Project(schema) + project = lance.LanceOperation.Project(lance_schema) dataset = lance.LanceDataset.commit(dataset, project, read_version=1) tbl = dataset.to_table() @@ -3013,8 +3015,9 @@ def test_schema_project_rename_column(tmp_path: Path): dataset = lance.write_dataset(table, base_dir) schema = pa.Table.from_pydict({"c": range(1), "d": range(1)}).schema + lance_schema = LanceSchema.from_pyarrow(schema) - project = lance.LanceOperation.Project(schema) + project = lance.LanceOperation.Project(lance_schema) dataset = lance.LanceDataset.commit(dataset, project, read_version=1) tbl = dataset.to_table() @@ -3035,8 +3038,9 @@ def test_schema_project_swap_column(tmp_path: Path): dataset = lance.write_dataset(table, base_dir) schema = pa.Table.from_pydict({"b": range(1), "a": range(1)}).schema + lance_schema = LanceSchema.from_pyarrow(schema) - project = lance.LanceOperation.Project(schema) + project = lance.LanceOperation.Project(lance_schema) dataset = lance.LanceDataset.commit(dataset, project, read_version=1) tbl = dataset.to_table() From da2da96df85a5d227a15f3fc6a601f37b38815eb Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Thu, 13 Mar 2025 10:30:39 +0800 Subject: [PATCH 5/6] fix format --- python/python/lance/dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 5a583734d1..c8e8fb02fe 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2997,6 +2997,7 @@ class Project(BaseOperation): schema: LanceSchema + class ScannerBuilder: def __init__(self, ds: LanceDataset): self.ds = ds From 777e49a4dfa6a537e234a5344de846d344bff421 Mon Sep 17 00:00:00 2001 From: huangzhaowei Date: Thu, 13 Mar 2025 11:45:58 +0800 Subject: [PATCH 6/6] remove comment --- python/python/lance/dataset.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index c8e8fb02fe..cc35ee1768 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2965,9 +2965,8 @@ class Project(BaseOperation): Attributes ---------- - schema: LanceSchema or pyarrow.Schema - The schema of the new dataset. Passing a LanceSchema is preferred, - and passing a pyarrow.Schema is deprecated. + schema: LanceSchema + The lance schema of the new dataset. Examples --------