Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: parallelize indexing partitions #3303

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 128 additions & 74 deletions rust/lance/src/index/vector/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use arrow::array::AsArray;
use arrow_array::{RecordBatch, UInt64Array};
use futures::prelude::stream::{StreamExt, TryStreamExt};
use futures::stream;
use futures::{stream, FutureExt};
use itertools::Itertools;
use lance_arrow::RecordBatchExt;
use lance_core::cache::FileMetadataCache;
Expand Down Expand Up @@ -84,7 +84,7 @@ pub struct IvfIndexBuilder<S: IvfSubIndex, Q: Quantization> {
// fields will be set during build
ivf: Option<IvfModel>,
quantizer: Option<Q>,
shuffle_reader: Option<Box<dyn ShuffleReader>>,
shuffle_reader: Option<Arc<dyn ShuffleReader>>,
partition_sizes: Vec<(usize, usize)>,

// fields for merging indices / remapping
Expand Down Expand Up @@ -412,7 +412,7 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
Some(Err(e)) => panic!("do this better: error reading first batch: {:?}", e),
None => {
log::info!("no data to shuffle");
self.shuffle_reader = Some(Box::new(IvfShufflerReader::new(
self.shuffle_reader = Some(Arc::new(IvfShufflerReader::new(
Arc::new(self.store.clone()),
self.temp_dir.clone(),
vec![0; ivf.num_partitions()],
Expand All @@ -427,18 +427,30 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
schema,
transformed_stream,
)))
.await?,
.await?
.into(),
);

Ok(self)
}

async fn build_partitions(&mut self) -> Result<&mut Self> {
let dataset = self.dataset.as_ref().ok_or(Error::invalid_input(
"dataset not set before building partitions",
location!(),
))?;
let ivf = self.ivf.as_ref().ok_or(Error::invalid_input(
"IVF not set before building partitions",
location!(),
))?;

let quantizer = self.quantizer.clone().ok_or(Error::invalid_input(
"quantizer not set before building partition",
location!(),
))?;
let sub_index_params = self.sub_index_params.clone().ok_or(Error::invalid_input(
"sub index params not set before building partition",
location!(),
))?;
let reader = self.shuffle_reader.as_ref().ok_or(Error::invalid_input(
"shuffle reader not set before building partitions",
location!(),
Expand All @@ -454,77 +466,78 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
.map(|(idx, _)| idx)
.collect::<Vec<_>>();

let dataset = Arc::new(dataset.clone());
let reader = reader.clone();
let existing_indices = Arc::new(self.existing_indices.clone());
let distance_type = self.distance_type;
let mut partition_sizes = vec![(0, 0); ivf.num_partitions()];
for (i, &partition) in partition_build_order.iter().enumerate() {
log::info!(
"building partition {}, progress {}/{}",
partition,
i + 1,
ivf.num_partitions(),
);
let mut batches = Vec::new();
for existing_index in self.existing_indices.iter() {
let existing_index = existing_index
.as_any()
.downcast_ref::<IVFIndex<S, Q>>()
.ok_or(Error::invalid_input(
"existing index is not IVF index",
location!(),
))?;

let part_storage = existing_index.load_partition_storage(partition).await?;
batches.extend(
self.take_vectors(part_storage.row_ids().cloned().collect_vec().as_ref())
.await?,
);
}
let build_iter = partition_build_order.iter().map(|&partition| {
let dataset = dataset.clone();
let reader = reader.clone();
let existing_indices = existing_indices.clone();
let column = self.column.clone();
let store = self.store.clone();
let temp_dir = self.temp_dir.clone();
let quantizer = quantizer.clone();
let sub_index_params = sub_index_params.clone();
async move {
let batches = Self::take_partition_batches(
partition,
existing_indices.as_ref(),
reader.as_ref(),
dataset.as_ref(),
&column,
&store,
)
.await?;

match reader.partition_size(partition)? {
0 => continue,
_ => {
let partition_data =
reader.read_partition(partition).await?.ok_or(Error::io(
format!("partition {} is empty", partition).as_str(),
location!(),
))?;
batches.extend(partition_data.try_collect::<Vec<_>>().await?);
let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
if num_rows == 0 {
return Ok((0, 0));
}
}
let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?;

let num_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
if num_rows == 0 {
continue;
Self::build_partition(
&temp_dir,
column,
distance_type,
quantizer,
sub_index_params,
batch,
partition,
)
.await
}
let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?;
let sizes = self.build_partition(partition, &batch).await?;
partition_sizes[partition] = sizes;
log::info!(
"partition {} built, progress {}/{}",
partition,
i + 1,
ivf.num_partitions()
);
});
let results = stream::iter(build_iter)
.buffered(get_num_compute_intensive_cpus())
.try_collect::<Vec<_>>()
.boxed()
.await?;

for (i, result) in results.into_iter().enumerate() {
partition_sizes[partition_build_order[i]] = result;
}

self.partition_sizes = partition_sizes;
Ok(self)
}

async fn build_partition(&self, part_id: usize, batch: &RecordBatch) -> Result<(usize, usize)> {
let quantizer = self.quantizer.clone().ok_or(Error::invalid_input(
"quantizer not set before building partition",
location!(),
))?;
let sub_index_params = self.sub_index_params.clone().ok_or(Error::invalid_input(
"sub index params not set before building partition",
location!(),
))?;

async fn build_partition(
temp_dir: &Path,
column: String,
distance_type: DistanceType,
quantizer: Q,
sub_index_params: S::BuildParams,
batch: RecordBatch,
part_id: usize,
) -> Result<(usize, usize)> {
let local_store = ObjectStore::local();
// build quantized vector storage
let storage_len = {
let storage = StorageBuilder::new(self.column.clone(), self.distance_type, quantizer)
.build(batch)?;
let path = self.temp_dir.child(format!("storage_part{}", part_id));
let storage =
StorageBuilder::new(column.clone(), distance_type, quantizer).build(&batch)?;
let path = temp_dir.child(format!("storage_part{}", part_id));
let batches = storage.to_batches()?;
FileWriter::create_file_with_batches(
&local_store,
Expand All @@ -538,10 +551,10 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>

// build the sub index, with in-memory storage
let index_len = {
let vectors = batch[&self.column].as_fixed_size_list();
let flat_storage = FlatFloatStorage::new(vectors.clone(), self.distance_type);
let vectors = batch[&column].as_fixed_size_list();
let flat_storage = FlatFloatStorage::new(vectors.clone(), distance_type);
let sub_index = S::index_vectors(&flat_storage, sub_index_params)?;
let path = self.temp_dir.child(format!("index_part{}", part_id));
let path = temp_dir.child(format!("index_part{}", part_id));
let index_batch = sub_index.to_batch()?;
let schema = index_batch.schema().as_ref().try_into()?;
FileWriter::create_file_with_batches(
Expand All @@ -557,6 +570,47 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
Ok((storage_len, index_len))
}

async fn take_partition_batches(
part_id: usize,
existing_indices: &[Arc<dyn VectorIndex>],
reader: &dyn ShuffleReader,
dataset: &Dataset,
column: &str,
store: &ObjectStore,
) -> Result<Vec<RecordBatch>> {
let mut batches = Vec::new();
for existing_index in existing_indices.iter() {
let existing_index = existing_index
.as_any()
.downcast_ref::<IVFIndex<S, Q>>()
.ok_or(Error::invalid_input(
"existing index is not IVF index",
location!(),
))?;

let part_storage = existing_index.load_partition_storage(part_id).await?;
batches.extend(
Self::take_vectors(
dataset,
column,
store,
part_storage.row_ids().cloned().collect_vec().as_ref(),
)
.await?,
);
}

if reader.partition_size(part_id)? > 0 {
let partition_data = reader.read_partition(part_id).await?.ok_or(Error::io(
format!("partition {} is empty", part_id).as_str(),
location!(),
))?;
batches.extend(partition_data.try_collect::<Vec<_>>().await?);
}

Ok(batches)
}

async fn merge_partitions(&mut self) -> Result<()> {
let ivf = self.ivf.as_ref().ok_or(Error::invalid_input(
"IVF not set before merge partitions",
Expand Down Expand Up @@ -707,16 +761,16 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>

// take vectors from the dataset
// used for reading vectors from existing indices
async fn take_vectors(&self, row_ids: &[u64]) -> Result<Vec<RecordBatch>> {
let dataset = self.dataset.as_ref().ok_or(Error::invalid_input(
"dataset not set before taking vectors",
location!(),
))?;
let column = self.column.clone();
let projection = Arc::new(dataset.schema().project(&[column.as_str()])?);
async fn take_vectors(
dataset: &Dataset,
column: &str,
store: &ObjectStore,
row_ids: &[u64],
) -> Result<Vec<RecordBatch>> {
let projection = Arc::new(dataset.schema().project(&[column])?);
// arrow uses i32 for index, so we chunk the row ids to avoid large batch causing overflow
let mut batches = Vec::new();
for chunk in row_ids.chunks(self.store.block_size()) {
for chunk in row_ids.chunks(store.block_size()) {
let batch = dataset
.take_rows(chunk, ProjectionRequest::Schema(projection.clone()))
.await?;
Expand Down
Loading