Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: build invert index distributely #3452

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
update test
chenkovsky committed Feb 13, 2025
commit 98c5880a4c673d431ddcbd760a6c5936b141be4f
41 changes: 14 additions & 27 deletions python/python/tests/test_commit_index.py
Original file line number Diff line number Diff line change
@@ -130,36 +130,13 @@ def test_indexed_unindexed_fragments(tmp_path):
read_version=ds.version,
)

unindexed_fragments = ds.unindexed_fragments("text")
unindexed_fragments = ds.unindexed_fragments("text_idx")
assert len(unindexed_fragments) == 1
assert unindexed_fragments[0].id == frags[1].fragment_id

indexed_fragments = [f for fs in ds.indexed_fragments("text") for f in fs]
indexed_fragments = [f for fs in ds.indexed_fragments("text_idx") for f in fs]
assert len(indexed_fragments) == 1
assert unindexed_fragments[0].id == frags[0].fragment_id


index = ds.create_scalar_index("text", "INVERTED", fragment_ids=[frags[1].fragment_id])
assert isinstance(index, dict)

indices = [index]
create_index_op = lance.LanceOperation.CreateIndex(
new_indices=indices,
removed_indices=[],
)
ds = lance.LanceDataset.commit(
ds.uri,
create_index_op,
read_version=ds.version,
)

unindexed_fragments = ds.unindexed_fragments("text")
assert len(unindexed_fragments) == 0

indexed_fragments = [f for fs in ds.indexed_fragments("text") for f in fs]
assert len(indexed_fragments) == 2
assert unindexed_fragments[0].id == frags[0].fragment_id
assert unindexed_fragments[1].id == frags[1].fragment_id
assert indexed_fragments[0].id == frags[0].fragment_id


def test_commit_index2(tmp_path):
@@ -184,7 +161,9 @@ def test_commit_index2(tmp_path):
)

indices = []
for f in ds.get_fragments():
frags = [f for f in ds.get_fragments()]

for f in frags:
index = ds.create_scalar_index("sentiment", "BITMAP", fragment_ids=[f.fragment_id])
assert isinstance(index, dict)
indices.append(index)
@@ -200,6 +179,14 @@ def test_commit_index2(tmp_path):
read_version=ds.version,
)

unindexed_fragments = ds.unindexed_fragments("text_idx")
assert len(unindexed_fragments) == 0

indexed_fragments = [f for fs in ds.indexed_fragments("text_idx") for f in fs]
assert len(indexed_fragments) == 2
assert indexed_fragments[0].id == frags[0].fragment_id
assert indexed_fragments[1].id == frags[1].fragment_id

results = ds.to_table(
full_text_query="puppy",
filter="sentiment='positive'",
19 changes: 8 additions & 11 deletions python/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use lance::dataset::transaction::{
use lance::datatypes::Schema;
use lance_table::format::{DataFile, Fragment, Index};
use pyo3::exceptions::PyValueError;
use pyo3::types::{PyDict, PyNone, PySet};
use pyo3::types::{PyDict, PyNone};
use pyo3::{intern, prelude::*};
use pyo3::{Bound, FromPyObject, PyAny, PyObject, PyResult, Python, ToPyObject};
use uuid::Uuid;
@@ -45,16 +45,13 @@ impl ToPyObject for PyLance<&DataReplacementGroup> {

impl FromPyObject<'_> for PyLance<Index> {
fn extract_bound(ob: &Bound<'_, PyAny>) -> PyResult<Self> {
let uuid = ob.getattr("uuid")?.extract()?;
let name = ob.getattr("name")?.extract()?;
let fields = ob.getattr("fields")?.extract()?;
let dataset_version = ob.getattr("dataset_version")?.extract()?;

let fragment_ids = ob.getattr("fragment_ids")?;
let fragment_ids_ref: &Bound<'_, PySet> = fragment_ids.downcast()?;
let fragment_ids = fragment_ids_ref
.into_iter()
.map(|id| id.extract())
let uuid = ob.get_item("uuid")?.extract()?;
let name = ob.get_item("name")?.extract()?;
let fields = ob.get_item("fields")?.extract()?;
let dataset_version = ob.get_item("dataset_version")?.extract()?;

let fragment_ids = ob.get_item("fragment_ids")?;
let fragment_ids = fragment_ids.iter()?.map(|id| id?.extract::<u32>())
.collect::<PyResult<Vec<u32>>>()?;
let fragment_bitmap = Some(fragment_ids.into_iter().collect());
Ok(Self(Index {
8 changes: 6 additions & 2 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
@@ -236,6 +236,11 @@ impl DatasetIndexExt for Dataset {
}
}

let fragment_bitmap = match &fragment_ids {
Some(fragment_ids) => Some(fragment_ids.iter().collect()),
None => Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
};

let index_id = Uuid::new_v4();
let index_details: prost_types::Any = match (index_type, params.index_name()) {
(
@@ -325,13 +330,12 @@ impl DatasetIndexExt for Dataset {
});
}
};

Ok(IndexMetadata {
uuid: index_id,
name: index_name,
fields: vec![field.id],
dataset_version: self.manifest.version,
fragment_bitmap: Some(self.get_fragments().iter().map(|f| f.id() as u32).collect()),
fragment_bitmap,
index_details: Some(index_details),
})
}