Skip to content

Commit 80cb78c

Browse files
authored
feat: expose make_deletions_null to python as include_deleted_rows (#3533)
1 parent 8643409 commit 80cb78c

File tree

7 files changed

+144
-2
lines changed

7 files changed

+144
-2
lines changed

python/python/lance/dataset.py

+31
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ def scanner(
322322
io_buffer_size: Optional[int] = None,
323323
late_materialization: Optional[bool | List[str]] = None,
324324
use_scalar_index: Optional[bool] = None,
325+
include_deleted_rows: Optional[bool] = None,
325326
) -> LanceScanner:
326327
"""Return a Scanner that can support various pushdowns.
327328
@@ -414,6 +415,14 @@ def scanner(
414415
fast_search: bool, default False
415416
If True, then the search will only be performed on the indexed data, which
416417
yields faster search time.
418+
include_deleted_rows: bool, default False
419+
If True, then rows that have been deleted, but are still present in the
420+
fragment, will be returned. These rows will have the _rowid column set
421+
to null. All other columns will reflect the value stored on disk and may
422+
not be null.
423+
424+
Note: if this is a search operation, or a take operation (including scalar
425+
indexed scans) then deleted rows cannot be returned.
417426
418427
Notes
419428
-----
@@ -463,6 +472,7 @@ def setopt(opt, val):
463472
setopt(builder.use_stats, use_stats)
464473
setopt(builder.use_scalar_index, use_scalar_index)
465474
setopt(builder.fast_search, fast_search)
475+
setopt(builder.include_deleted_rows, include_deleted_rows)
466476

467477
# columns=None has a special meaning. we can't treat it as "user didn't specify"
468478
if self._default_scan_options is None:
@@ -543,6 +553,7 @@ def to_table(
543553
io_buffer_size: Optional[int] = None,
544554
late_materialization: Optional[bool | List[str]] = None,
545555
use_scalar_index: Optional[bool] = None,
556+
include_deleted_rows: Optional[bool] = None,
546557
) -> pa.Table:
547558
"""Read the data into memory as a :py:class:`pyarrow.Table`
548559
@@ -612,6 +623,14 @@ def to_table(
612623
currently only supports a single column in the columns list.
613624
- query: str
614625
The query string to search for.
626+
include_deleted_rows: bool, optional, default False
627+
If True, then rows that have been deleted, but are still present in the
628+
fragment, will be returned. These rows will have the _rowid column set
629+
to null. All other columns will reflect the value stored on disk and may
630+
not be null.
631+
632+
Note: if this is a search operation, or a take operation (including scalar
633+
indexed scans) then deleted rows cannot be returned.
615634
616635
Notes
617636
-----
@@ -639,6 +658,7 @@ def to_table(
639658
use_stats=use_stats,
640659
fast_search=fast_search,
641660
full_text_query=full_text_query,
661+
include_deleted_rows=include_deleted_rows,
642662
).to_table()
643663

644664
@property
@@ -2982,6 +3002,7 @@ def __init__(self, ds: LanceDataset):
29823002
self._fast_search = False
29833003
self._full_text_query = None
29843004
self._use_scalar_index = None
3005+
self._include_deleted_rows = None
29853006

29863007
def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder:
29873008
for key, value in default_opts.items():
@@ -3259,6 +3280,15 @@ def fast_search(self, flag: bool) -> ScannerBuilder:
32593280
self._fast_search = flag
32603281
return self
32613282

3283+
def include_deleted_rows(self, flag: bool) -> ScannerBuilder:
3284+
"""Include deleted rows
3285+
3286+
Rows which have been deleted, but are still present in the fragment, will be
3287+
returned. These rows will have all columns (except _rowaddr) set to null
3288+
"""
3289+
self._include_deleted_rows = flag
3290+
return self
3291+
32623292
def full_text_search(
32633293
self,
32643294
query: str,
@@ -3296,6 +3326,7 @@ def to_scanner(self) -> LanceScanner:
32963326
self._full_text_query,
32973327
self._late_materialization,
32983328
self._use_scalar_index,
3329+
self._include_deleted_rows,
32993330
)
33003331
return LanceScanner(scanner, self.ds)
33013332

python/python/lance/lance/__init__.pyi

+1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ class _Dataset:
206206
full_text_query: Optional[dict] = None,
207207
late_materialization: Optional[bool | List[str]] = None,
208208
use_scalar_index: Optional[bool] = None,
209+
include_deleted_rows: Optional[bool] = None,
209210
) -> _Scanner: ...
210211
def count_rows(self, filter: Optional[str] = None) -> int: ...
211212
def take(

python/python/tests/test_dataset.py

+38
Original file line numberDiff line numberDiff line change
@@ -2261,6 +2261,44 @@ def test_scanner_schemas(tmp_path: Path):
22612261
assert scanner.projected_schema == pa.schema([pa.field("a", pa.int64())])
22622262

22632263

2264+
def test_scan_deleted_rows(tmp_path: Path):
2265+
base_dir = tmp_path / "dataset"
2266+
df = pd.DataFrame({"a": range(100), "b": range(100)})
2267+
ds = lance.write_dataset(df, base_dir, max_rows_per_file=25)
2268+
ds.create_scalar_index("b", "BTREE")
2269+
ds.delete("a < 30")
2270+
2271+
assert ds.count_rows() == 70
2272+
2273+
assert ds.scanner(with_row_id=True).to_table().num_rows == 70
2274+
with_deleted = ds.scanner(with_row_id=True, include_deleted_rows=True).to_table()
2275+
2276+
assert with_deleted.num_rows == 75
2277+
2278+
assert with_deleted.slice(0, 5) == pa.table(
2279+
{
2280+
"a": range(25, 30),
2281+
"b": range(25, 30),
2282+
"_rowid": pa.array([None] * 5, pa.uint64()),
2283+
}
2284+
)
2285+
2286+
assert (
2287+
ds.scanner(with_row_id=True, include_deleted_rows=True, filter="a < 32")
2288+
.to_table()
2289+
.num_rows
2290+
== 7
2291+
)
2292+
2293+
with pytest.raises(ValueError, match="Cannot include deleted rows"):
2294+
ds.scanner(
2295+
include_deleted_rows=True, with_row_id=True, filter="b < 30"
2296+
).to_table()
2297+
2298+
with pytest.raises(ValueError, match="with_row_id is false"):
2299+
ds.scanner(include_deleted_rows=True, filter="a < 30").to_table()
2300+
2301+
22642302
def test_custom_commit_lock(tmp_path: Path):
22652303
called_lock = False
22662304
called_release = False

python/python/tests/test_scalar_index.py

+5
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ def test_full_text_search(dataset, with_position):
272272
for row in results:
273273
assert query in row.as_py()
274274

275+
with pytest.raises(ValueError, match="Cannot include deleted rows"):
276+
dataset.to_table(
277+
with_row_id=True, full_text_query=query, include_deleted_rows=True
278+
)
279+
275280

276281
def test_filter_with_fts_index(dataset):
277282
dataset.create_scalar_index("doc", index_type="INVERTED", with_position=False)

python/python/tests/test_vector_index.py

+13
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,19 @@ def test_optimize_indices(indexed_dataset):
11211121
assert len(indices) == 2
11221122

11231123

1124+
def test_no_include_deleted_rows(indexed_dataset):
1125+
with pytest.raises(ValueError, match="Cannot include deleted rows"):
1126+
indexed_dataset.to_table(
1127+
nearest={
1128+
"column": "vector",
1129+
"q": np.random.randn(128),
1130+
"k": 10,
1131+
},
1132+
with_row_id=True,
1133+
include_deleted_rows=True,
1134+
)
1135+
1136+
11241137
def test_drop_indices(indexed_dataset):
11251138
idx_name = indexed_dataset.list_indices()[0]["name"]
11261139

python/src/dataset.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ impl Dataset {
486486
}
487487

488488
#[allow(clippy::too_many_arguments)]
489-
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None))]
489+
#[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None))]
490490
fn scanner(
491491
self_: PyRef<'_, Self>,
492492
columns: Option<Vec<String>>,
@@ -510,6 +510,7 @@ impl Dataset {
510510
full_text_query: Option<&Bound<'_, PyDict>>,
511511
late_materialization: Option<PyObject>,
512512
use_scalar_index: Option<bool>,
513+
include_deleted_rows: Option<bool>,
513514
) -> PyResult<Scanner> {
514515
let mut scanner: LanceScanner = self_.ds.scan();
515516
match (columns, columns_with_transform) {
@@ -608,6 +609,10 @@ impl Dataset {
608609
scanner.fast_search();
609610
}
610611

612+
if let Some(true) = include_deleted_rows {
613+
scanner.include_deleted_rows();
614+
}
615+
611616
if let Some(fragments) = fragments {
612617
let fragments = fragments
613618
.into_iter()

rust/lance/src/dataset/scanner.rs

+50-1
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,9 @@ pub struct Scanner {
334334
/// This is essentially a weak consistency search. Users can run index or optimize index
335335
/// to make the index catch up with the latest data.
336336
fast_search: bool,
337+
338+
/// If true, the scanner will emit deleted rows
339+
include_deleted_rows: bool,
337340
}
338341

339342
fn escape_column_name(name: &str) -> String {
@@ -372,6 +375,7 @@ impl Scanner {
372375
fragments: None,
373376
fast_search: false,
374377
use_scalar_index: true,
378+
include_deleted_rows: false,
375379
}
376380
}
377381

@@ -551,6 +555,21 @@ impl Scanner {
551555
self
552556
}
553557

558+
/// Include deleted rows
559+
///
560+
/// These are rows that have been deleted from the dataset but are still present in the
561+
/// underlying storage. These rows will have the `_rowid` column set to NULL. The other columns
562+
/// (include _rowaddr) will be set to their deleted values.
563+
///
564+
/// This can be useful for generating aligned fragments or debugging
565+
///
566+
/// Note: when entire fragments are deleted, the scanner will not emit any rows for that fragment
567+
/// since the fragment is no longer present in the dataset.
568+
pub fn include_deleted_rows(&mut self) -> &mut Self {
569+
self.include_deleted_rows = true;
570+
self
571+
}
572+
554573
/// Set the I/O buffer size
555574
///
556575
/// This is the amount of RAM that will be reserved for holding I/O received from
@@ -1235,6 +1254,14 @@ impl Scanner {
12351254
location: location!(),
12361255
});
12371256
}
1257+
1258+
if self.include_deleted_rows && !self.with_row_id {
1259+
return Err(Error::InvalidInput {
1260+
source: "include_deleted_rows is set but with_row_id is false".into(),
1261+
location: location!(),
1262+
});
1263+
}
1264+
12381265
if let Some(first_blob_col) = self
12391266
.projection_plan
12401267
.physical_schema
@@ -1318,6 +1345,13 @@ impl Scanner {
13181345
// Stage 1: source (either an (K|A)NN search, full text search or or a (full|indexed) scan)
13191346
let mut plan: Arc<dyn ExecutionPlan> = match (&self.nearest, &self.full_text_query) {
13201347
(Some(_), None) => {
1348+
if self.include_deleted_rows {
1349+
return Err(Error::InvalidInput {
1350+
source: "Cannot include deleted rows in a nearest neighbor search".into(),
1351+
location: location!(),
1352+
});
1353+
}
1354+
13211355
// The source is an nearest neighbor search
13221356
if self.prefilter {
13231357
// If we are prefiltering then the knn node will take care of the filter
@@ -1332,6 +1366,13 @@ impl Scanner {
13321366
}
13331367
}
13341368
(None, Some(query)) => {
1369+
if self.include_deleted_rows {
1370+
return Err(Error::InvalidInput {
1371+
source: "Cannot include deleted rows in an FTS search".into(),
1372+
location: location!(),
1373+
});
1374+
}
1375+
13351376
// The source is an FTS search
13361377
if self.prefilter {
13371378
// If we are prefiltering then the fts node will take care of the filter
@@ -1357,6 +1398,14 @@ impl Scanner {
13571398
} else {
13581399
self.use_stats
13591400
};
1401+
1402+
if filter_plan.index_query.is_some() && self.include_deleted_rows {
1403+
return Err(Error::InvalidInput {
1404+
source: "Cannot include deleted rows in a scalar indexed scan".into(),
1405+
location: location!(),
1406+
});
1407+
}
1408+
13601409
match (
13611410
filter_plan.index_query.is_some(),
13621411
filter_plan.refine_expr.is_some(),
@@ -1406,7 +1455,7 @@ impl Scanner {
14061455
self.scan(
14071456
with_row_id,
14081457
self.with_row_address,
1409-
false,
1458+
self.include_deleted_rows,
14101459
scan_range,
14111460
eager_schema,
14121461
)

0 commit comments

Comments
 (0)