Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add project transaction operation for pylance sdk #3538

Merged
merged 7 commits into from
Mar 13, 2025
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
@@ -2957,6 +2957,54 @@ class DataReplacement(BaseOperation):

replacements: List[LanceOperation.DataReplacementGroup]

@dataclass
class Project(BaseOperation):
"""
Operation that project columns.
Use this operator for drop column or rename/swap column.
Attributes
----------
schema: LanceSchema or pyarrow.Schema
The schema of the new dataset. Passing a LanceSchema is preferred,
and passing a pyarrow.Schema is deprecated.
Examples
--------
Use the projece operator to swap column:
>>> import lance
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table = pa.table({"a": [1, 2], "b": ["a", "b"], "b1": ["c", "d"]})
>>> dataset = lance.write_dataset(table, "example")
>>> dataset.to_table().to_pandas()
a b b1
0 1 a c
1 2 b d
>>>
>>> ## rename column `b` into `b0` and rename b1 into `b`
>>> table = pa.table({"a": [3, 4], "b0": ["a", "b"], "b": ["c", "d"]})
>>> operation = lance.LanceOperation.Project(table.schema)
>>>
>>> dataset = lance.LanceDataset.commit("example", operation, read_version=1)
>>> dataset.to_table().to_pandas()
a b0 b
0 1 a c
1 2 b d
"""

schema: LanceSchema | pa.Schema

def __post_init__(self):
if isinstance(self.schema, pa.Schema):
warnings.warn(
"Passing a pyarrow.Schema to Project is deprecated. "
"Please use a LanceSchema instead.",
DeprecationWarning,
)
self.schema = LanceSchema.from_pyarrow(self.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create brand new APIs that are deprecated?

Suggested change
schema: LanceSchema | pa.Schema
def __post_init__(self):
if isinstance(self.schema, pa.Schema):
warnings.warn(
"Passing a pyarrow.Schema to Project is deprecated. "
"Please use a LanceSchema instead.",
DeprecationWarning,
)
self.schema = LanceSchema.from_pyarrow(self.schema)
schema: LanceSchema

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I removed the pa.schema support



class ScannerBuilder:
def __init__(self, ds: LanceDataset):
65 changes: 65 additions & 0 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
@@ -2985,6 +2985,71 @@ def test_data_replacement(tmp_path: Path):
assert tbl == expected


def test_schema_project_drop_column(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)})
base_dir = tmp_path / "test"

dataset = lance.write_dataset(table, base_dir)

schema = pa.Table.from_pydict({"a": range(1)}).schema

project = lance.LanceOperation.Project(schema)
dataset = lance.LanceDataset.commit(dataset, project, read_version=1)

tbl = dataset.to_table()

expected = pa.Table.from_pydict(
{
"a": list(range(100, 200)),
}
)
assert tbl == expected


def test_schema_project_rename_column(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)})
base_dir = tmp_path / "test"

dataset = lance.write_dataset(table, base_dir)

schema = pa.Table.from_pydict({"c": range(1), "d": range(1)}).schema

project = lance.LanceOperation.Project(schema)
dataset = lance.LanceDataset.commit(dataset, project, read_version=1)

tbl = dataset.to_table()

expected = pa.Table.from_pydict(
{
"c": list(range(100, 200)),
"d": list(range(300, 400)),
}
)
assert tbl == expected


def test_schema_project_swap_column(tmp_path: Path):
table = pa.Table.from_pydict({"a": range(100, 200), "b": range(300, 400)})
base_dir = tmp_path / "test"

dataset = lance.write_dataset(table, base_dir)

schema = pa.Table.from_pydict({"b": range(1), "a": range(1)}).schema

project = lance.LanceOperation.Project(schema)
dataset = lance.LanceDataset.commit(dataset, project, read_version=1)

tbl = dataset.to_table()

expected = pa.Table.from_pydict(
{
"b": list(range(100, 200)),
"a": list(range(300, 400)),
}
)
assert tbl == expected


def test_empty_structs(tmp_path):
schema = pa.schema([pa.field("id", pa.int32()), pa.field("empties", pa.struct([]))])
table = pa.table({"id": [0, 1, 2], "empties": [{}] * 3}, schema=schema)
6 changes: 6 additions & 0 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -157,6 +157,12 @@ impl FromPyObject<'_> for PyLance<Operation> {

Ok(Self(op))
}
"Project" => {
let schema = extract_schema(&ob.getattr("schema")?)?;

let op = Operation::Project { schema };
Ok(Self(op))
}
unsupported => Err(PyValueError::new_err(format!(
"Unsupported operation: {unsupported}",
))),