Skip to content

Commit 42722fb

Browse files
feat: allow replacement of entire datafile when the schema lines up correctly (#3408)
For internal design doc see notion or ping me What this PR implements ![image](https://github.com/user-attachments/assets/85219024-a4a4-4b9a-bfb7-2b2f4990e68d) Plan of attack: * This PR: basic functionality, i.e. when there is no conflict calling this tx should just work * Next PR: implement more fine-grained conflict resolution * Potential future PR (when time permits): Allow partial replacement of a datafile. This can be done by "dropping" column indice in a datafile, thereby dropping the column in favor of another TODO: - [x] proto definition of the new transaction - [x] simple rust tests - [x] test error handling - [x] PR desc - [x] python tests - [x] implement conflict detection
1 parent 1ea5909 commit 42722fb

File tree

8 files changed

+711
-24
lines changed

8 files changed

+711
-24
lines changed

protos/transaction.proto

+12-1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,16 @@ message Transaction {
173173
}
174174
}
175175

176+
message DataReplacementGroup {
177+
uint64 fragment_id = 1;
178+
DataFile new_file = 2;
179+
}
180+
181+
// An operation that replaces the data in a region of the table with new data.
182+
message DataReplacement {
183+
repeated DataReplacementGroup replacements = 1;
184+
}
185+
176186
// The operation of this transaction.
177187
oneof operation {
178188
Append append = 100;
@@ -186,11 +196,12 @@ message Transaction {
186196
Update update = 108;
187197
Project project = 109;
188198
UpdateConfig update_config = 110;
199+
DataReplacement data_replacement = 111;
189200
}
190201

191202
// An operation to apply to the blob dataset
192203
oneof blob_operation {
193204
Append blob_append = 200;
194205
Overwrite blob_overwrite = 202;
195206
}
196-
}
207+
}

python/python/lance/dataset.py

+21-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
)
4646
from .dependencies import numpy as np
4747
from .dependencies import pandas as pd
48-
from .fragment import FragmentMetadata, LanceFragment
48+
from .fragment import DataFile, FragmentMetadata, LanceFragment
4949
from .lance import (
5050
CleanupStats,
5151
Compaction,
@@ -1927,7 +1927,7 @@ def create_index(
19271927
valid_index_types = ["IVF_FLAT", "IVF_PQ", "IVF_HNSW_PQ", "IVF_HNSW_SQ"]
19281928
if index_type not in valid_index_types:
19291929
raise NotImplementedError(
1930-
f"Only {valid_index_types} index types supported. " f"Got {index_type}"
1930+
f"Only {valid_index_types} index types supported. Got {index_type}"
19311931
)
19321932
if index_type != "IVF_PQ" and one_pass_ivfpq:
19331933
raise ValueError(
@@ -2247,8 +2247,7 @@ def _commit(
22472247
commit_lock: Optional[CommitLock] = None,
22482248
) -> LanceDataset:
22492249
warnings.warn(
2250-
"LanceDataset._commit() is deprecated, use LanceDataset.commit()"
2251-
" instead",
2250+
"LanceDataset._commit() is deprecated, use LanceDataset.commit() instead",
22522251
DeprecationWarning,
22532252
)
22542253
return LanceDataset.commit(base_uri, operation, read_version, commit_lock)
@@ -2935,6 +2934,23 @@ class CreateIndex(BaseOperation):
29352934
dataset_version: int
29362935
fragment_ids: Set[int]
29372936

2937+
@dataclass
2938+
class DataReplacementGroup:
2939+
"""
2940+
Group of data replacements
2941+
"""
2942+
2943+
fragment_id: int
2944+
new_file: DataFile
2945+
2946+
@dataclass
2947+
class DataReplacement(BaseOperation):
2948+
"""
2949+
Operation that replaces existing datafiles in the dataset.
2950+
"""
2951+
2952+
replacements: List[LanceOperation.DataReplacementGroup]
2953+
29382954

29392955
class ScannerBuilder:
29402956
def __init__(self, ds: LanceDataset):
@@ -3203,7 +3219,7 @@ def nearest(
32033219

32043220
if q_dim != dim:
32053221
raise ValueError(
3206-
f"Query vector size {len(q)} does not match index column size" f" {dim}"
3222+
f"Query vector size {len(q)} does not match index column size {dim}"
32073223
)
32083224

32093225
if k is not None and int(k) <= 0:

python/python/lance/file.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def take_rows(
134134
if indices[i] > indices[i + 1]:
135135
raise ValueError(
136136
f"Indices must be sorted in ascending order for \
137-
file API, got {indices[i]} > {indices[i+1]}"
137+
file API, got {indices[i]} > {indices[i + 1]}"
138138
)
139139

140140
return ReaderResults(

python/python/lance/ray/sink.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def on_write_complete(
161161

162162
if len(write_results) == 0:
163163
warnings.warn(
164-
"write results is empty. please check ray version " "or internal error",
164+
"write results is empty. please check ray version or internal error",
165165
DeprecationWarning,
166166
)
167167
return

python/python/tests/test_dataset.py

+25
Original file line numberDiff line numberDiff line change
@@ -2913,3 +2913,28 @@ def test_dataset_schema(tmp_path: Path):
29132913
ds = lance.write_dataset(table, str(tmp_path)) # noqa: F841
29142914
ds._default_scan_options = {"with_row_id": True}
29152915
assert ds.schema == ds.to_table().schema
2916+
2917+
2918+
def test_data_replacement(tmp_path: Path):
2919+
table = pa.Table.from_pydict({"a": range(100), "b": range(100)})
2920+
base_dir = tmp_path / "test"
2921+
2922+
dataset = lance.write_dataset(table, base_dir)
2923+
2924+
table = pa.Table.from_pydict({"a": range(100, 200), "b": range(100, 200)})
2925+
fragment = lance.fragment.LanceFragment.create(base_dir, table)
2926+
data_file = fragment.files[0]
2927+
data_replacement = lance.LanceOperation.DataReplacement(
2928+
[lance.LanceOperation.DataReplacementGroup(0, data_file)]
2929+
)
2930+
dataset = lance.LanceDataset.commit(dataset, data_replacement, read_version=1)
2931+
2932+
tbl = dataset.to_table()
2933+
2934+
expected = pa.Table.from_pydict(
2935+
{
2936+
"a": list(range(100, 200)),
2937+
"b": list(range(100, 200)),
2938+
}
2939+
)
2940+
assert tbl == expected

python/src/transaction.rs

+44-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33

44
use arrow::pyarrow::PyArrowType;
55
use arrow_schema::Schema as ArrowSchema;
6-
use lance::dataset::transaction::{Operation, RewriteGroup, RewrittenIndex, Transaction};
6+
use lance::dataset::transaction::{
7+
DataReplacementGroup, Operation, RewriteGroup, RewrittenIndex, Transaction,
8+
};
79
use lance::datatypes::Schema;
8-
use lance_table::format::{Fragment, Index};
10+
use lance_table::format::{DataFile, Fragment, Index};
911
use pyo3::exceptions::PyValueError;
1012
use pyo3::types::PySet;
1113
use pyo3::{intern, prelude::*};
@@ -15,6 +17,32 @@ use uuid::Uuid;
1517
use crate::schema::LanceSchema;
1618
use crate::utils::{class_name, export_vec, extract_vec, PyLance};
1719

20+
impl FromPyObject<'_> for PyLance<DataReplacementGroup> {
21+
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
22+
let fragment_id = ob.getattr("fragment_id")?.extract::<u64>()?;
23+
let new_file = &ob.getattr("new_file")?.extract::<PyLance<DataFile>>()?;
24+
25+
Ok(Self(DataReplacementGroup(fragment_id, new_file.0.clone())))
26+
}
27+
}
28+
29+
impl ToPyObject for PyLance<&DataReplacementGroup> {
30+
fn to_object(&self, py: Python<'_>) -> PyObject {
31+
let namespace = py
32+
.import_bound(intern!(py, "lance"))
33+
.and_then(|module| module.getattr(intern!(py, "LanceOperation")))
34+
.expect("Failed to import LanceOperation namespace");
35+
36+
let fragment_id = self.0 .0;
37+
let new_file = PyLance(&self.0 .1).to_object(py);
38+
39+
let cls = namespace
40+
.getattr("DataReplacementGroup")
41+
.expect("Failed to get DataReplacementGroup class");
42+
cls.call1((fragment_id, new_file)).unwrap().to_object(py)
43+
}
44+
}
45+
1846
impl FromPyObject<'_> for PyLance<Operation> {
1947
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
2048
match class_name(ob)? {
@@ -118,6 +146,13 @@ impl FromPyObject<'_> for PyLance<Operation> {
118146
};
119147
Ok(Self(op))
120148
}
149+
"DataReplacement" => {
150+
let replacements = extract_vec(&ob.getattr("replacements")?)?;
151+
152+
let op = Operation::DataReplacement { replacements };
153+
154+
Ok(Self(op))
155+
}
121156
unsupported => Err(PyValueError::new_err(format!(
122157
"Unsupported operation: {unsupported}",
123158
))),
@@ -172,6 +207,13 @@ impl ToPyObject for PyLance<&Operation> {
172207
.unwrap()
173208
.to_object(py)
174209
}
210+
Operation::DataReplacement { replacements } => {
211+
let replacements = export_vec(py, replacements.as_slice());
212+
let cls = namespace
213+
.getattr("DataReplacement")
214+
.expect("Failed to get DataReplacement class");
215+
cls.call1((replacements,)).unwrap().to_object(py)
216+
}
175217
_ => todo!(),
176218
}
177219
}

0 commit comments

Comments
 (0)