From be1974d41e4d72a7b73a1e6e5adc0504224b6e01 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 28 Feb 2025 15:56:57 +0800 Subject: [PATCH 01/22] feat: record loss for IVF and KMeans Signed-off-by: BubbleCal --- rust/lance-index/src/vector/ivf/storage.rs | 14 ++++++++++++++ rust/lance-linalg/src/kmeans.rs | 13 +++++++++++++ rust/lance/src/index/vector/ivf/v2.rs | 1 + 3 files changed, 28 insertions(+) diff --git a/rust/lance-index/src/vector/ivf/storage.rs b/rust/lance-index/src/vector/ivf/storage.rs index 3c8ebc5771..2bace2b3f4 100644 --- a/rust/lance-index/src/vector/ivf/storage.rs +++ b/rust/lance-index/src/vector/ivf/storage.rs @@ -34,6 +34,9 @@ pub struct IvfModel { /// Number of vectors in each partition. pub lengths: Vec, + + /// Kmeans loss + pub loss: Option, } impl DeepSizeOf for IvfModel { @@ -53,6 +56,7 @@ impl IvfModel { centroids: None, offsets: vec![], lengths: vec![], + loss: None, } } @@ -61,6 +65,7 @@ impl IvfModel { centroids: Some(centroids), offsets: vec![], lengths: vec![], + loss: None, } } @@ -88,6 +93,14 @@ impl IvfModel { self.lengths[part] as usize } + pub fn num_rows(&self) -> u64 { + self.lengths.iter().map(|x| *x as u64).sum() + } + + pub fn avg_loss(&self) -> Option { + self.loss.map(|loss| loss / self.num_rows() as f64) + } + /// Use the query vector to find `nprobes` closest partitions. pub fn find_partitions( &self, @@ -215,6 +228,7 @@ impl TryFrom for IvfModel { centroids, offsets, lengths: proto.lengths, + loss: None, }) } } diff --git a/rust/lance-linalg/src/kmeans.rs b/rust/lance-linalg/src/kmeans.rs index a318a92b6c..07e9018450 100644 --- a/rust/lance-linalg/src/kmeans.rs +++ b/rust/lance-linalg/src/kmeans.rs @@ -103,6 +103,9 @@ pub struct KMeans { /// How to calculate distance between two vectors. pub distance_type: DistanceType, + + /// The loss of the last training. + pub loss: f64, } /// Randomly initialize kmeans centroids. @@ -127,6 +130,7 @@ fn kmeans_random_init( centroids: Arc::new(centroids), dimension, distance_type, + loss: f64::MAX, } } @@ -191,6 +195,7 @@ pub trait KMeansAlgo { k: usize, membership: &[Option], distance_type: DistanceType, + loss: f64, ) -> KMeans; } @@ -245,6 +250,7 @@ where k: usize, membership: &[Option], distance_type: DistanceType, + loss: f64, ) -> KMeans { let mut cluster_cnts = vec![0_u64; k]; let mut new_centroids = vec![T::Native::zero(); k * dimension]; @@ -293,6 +299,7 @@ where centroids: Arc::new(PrimitiveArray::::from_iter_values(new_centroids)), dimension, distance_type, + loss, } } } @@ -337,6 +344,7 @@ impl KMeansAlgo for KModeAlgo { k: usize, membership: &[Option], distance_type: DistanceType, + loss: f64, ) -> KMeans { assert_eq!(distance_type, DistanceType::Hamming); @@ -379,6 +387,7 @@ impl KMeansAlgo for KModeAlgo { centroids: Arc::new(UInt8Array::from(centroids)), dimension, distance_type, + loss, } } } @@ -389,6 +398,7 @@ impl KMeans { centroids: arrow_array::array::new_empty_array(&DataType::Float32), dimension, distance_type, + loss: f64::MAX, } } @@ -398,6 +408,7 @@ impl KMeans { centroids: ArrayRef, dimension: usize, distance_type: DistanceType, + loss: f64, ) -> Self { assert!(matches!( centroids.data_type(), @@ -407,6 +418,7 @@ impl KMeans { centroids, dimension, distance_type, + loss, } } @@ -496,6 +508,7 @@ impl KMeans { k, &membership, params.distance_type, + last_loss, ); last_membership = Some(membership); if (loss - last_loss).abs() / last_loss < params.tolerance { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index c834612b6e..e72153f10e 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -781,6 +781,7 @@ mod tests { let gt_set = gt.iter().map(|r| r.1).collect::>(); let recall = row_ids.intersection(>_set).count() as f32 / k as f32; + println!("recall: {}", recall); assert!( recall >= recall_requirement, "recall: {}\n results: {:?}\n\ngt: {:?}", From b0e1c24b97aac983aff99110ae14110a3743cfae Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 28 Feb 2025 16:28:25 +0800 Subject: [PATCH 02/22] fix Signed-off-by: BubbleCal --- rust/lance/src/index/vector/ivf.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index b211adf2e7..2b2608b070 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -1403,6 +1403,7 @@ pub(crate) async fn remap_index_file( centroids: index.ivf.centroids.clone(), offsets: Vec::with_capacity(index.ivf.offsets.len()), lengths: Vec::with_capacity(index.ivf.lengths.len()), + loss: index.ivf.loss, }; while let Some(write_task) = task_stream.try_next().await? { write_task.write(&mut writer, &mut ivf).await?; From 2973164cefcef0f91f427e60684940775d4887ab Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 28 Feb 2025 18:04:32 +0800 Subject: [PATCH 03/22] impl Signed-off-by: BubbleCal --- java/core/lance-jni/src/utils.rs | 2 - rust/lance-index/src/vector.rs | 2 +- rust/lance-index/src/vector/hnsw/index.rs | 2 +- rust/lance-index/src/vector/ivf/builder.rs | 9 ++-- rust/lance-index/src/vector/kmeans.rs | 10 ++-- rust/lance-index/src/vector/pq/builder.rs | 1 + rust/lance-linalg/src/kmeans.rs | 33 ++++++++---- rust/lance/src/index/vector/builder.rs | 35 ++++++++++--- rust/lance/src/index/vector/ivf.rs | 58 ++++++++++++++++++---- rust/lance/src/index/vector/ivf/v2.rs | 4 +- rust/lance/src/index/vector/pq.rs | 2 +- 11 files changed, 114 insertions(+), 44 deletions(-) diff --git a/java/core/lance-jni/src/utils.rs b/java/core/lance-jni/src/utils.rs index 4a2d4ae529..73da3355f8 100644 --- a/java/core/lance-jni/src/utils.rs +++ b/java/core/lance-jni/src/utils.rs @@ -189,7 +189,6 @@ pub fn get_index_params( env.get_int_as_usize_from_method(&ivf_params_obj, "getShufflePartitionBatches")?; let shuffle_partition_concurrency = env.get_int_as_usize_from_method(&ivf_params_obj, "getShufflePartitionConcurrency")?; - let use_residual = env.get_boolean_from_method(&ivf_params_obj, "useResidual")?; let ivf_params = IvfBuildParams { num_partitions, @@ -197,7 +196,6 @@ pub fn get_index_params( sample_rate, shuffle_partition_batches, shuffle_partition_concurrency, - use_residual, ..Default::default() }; stages.push(StageParams::Ivf(ivf_params)); diff --git a/rust/lance-index/src/vector.rs b/rust/lance-index/src/vector.rs index 6717a59a4c..6c7bbdb273 100644 --- a/rust/lance-index/src/vector.rs +++ b/rust/lance-index/src/vector.rs @@ -222,7 +222,7 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index { /// The metric type of this vector index. fn metric_type(&self) -> DistanceType; - fn ivf_model(&self) -> IvfModel; + fn ivf_model(&self) -> &IvfModel; fn quantizer(&self) -> Quantizer; /// the index type of this vector index. diff --git a/rust/lance-index/src/vector/hnsw/index.rs b/rust/lance-index/src/vector/hnsw/index.rs index 88778c1b1c..27a4a8e44a 100644 --- a/rust/lance-index/src/vector/hnsw/index.rs +++ b/rust/lance-index/src/vector/hnsw/index.rs @@ -304,7 +304,7 @@ impl VectorIndex for HNSWIndex { }) } - fn ivf_model(&self) -> IvfModel { + fn ivf_model(&self) -> &IvfModel { unimplemented!("only for IVF") } diff --git a/rust/lance-index/src/vector/ivf/builder.rs b/rust/lance-index/src/vector/ivf/builder.rs index e2e22aa5a6..bbc46beaeb 100644 --- a/rust/lance-index/src/vector/ivf/builder.rs +++ b/rust/lance-index/src/vector/ivf/builder.rs @@ -28,6 +28,10 @@ pub struct IvfBuildParams { /// Use provided IVF centroids. pub centroids: Option>, + /// Retrain centroids. + /// If true, the centroids will be retrained based on provided `centroids`. + pub retrain: bool, + pub sample_rate: usize, /// Precomputed partitions file (row_id -> partition_id) @@ -45,9 +49,6 @@ pub struct IvfBuildParams { pub shuffle_partition_concurrency: usize, - /// Use residual vectors to build sub-vector. - pub use_residual: bool, - /// Storage options used to load precomputed partitions. pub storage_options: Option>, } @@ -58,12 +59,12 @@ impl Default for IvfBuildParams { num_partitions: 32, max_iters: 50, centroids: None, + retrain: false, sample_rate: 256, // See faiss precomputed_partitions_file: None, precomputed_shuffle_buffers: None, shuffle_partition_batches: 1024 * 10, shuffle_partition_concurrency: 2, - use_residual: true, storage_options: None, } } diff --git a/rust/lance-index/src/vector/kmeans.rs b/rust/lance-index/src/vector/kmeans.rs index 5f73a278bc..66d4f84189 100644 --- a/rust/lance-index/src/vector/kmeans.rs +++ b/rust/lance-index/src/vector/kmeans.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::sync::Arc; + use arrow_array::{types::ArrowPrimitiveType, ArrayRef, FixedSizeListArray, PrimitiveArray}; use lance_arrow::FixedSizeListArrayExt; use log::info; @@ -16,6 +18,7 @@ use lance_linalg::{ /// Train KMeans model and returns the centroids of each cluster. #[allow(clippy::too_many_arguments)] pub fn train_kmeans( + centroids: Option>, array: &[T::Native], dimension: usize, k: usize, @@ -57,12 +60,7 @@ where PrimitiveArray::::from(array.to_vec()) }; - let params = KMeansParams { - max_iters: max_iterations, - distance_type, - redos, - ..Default::default() - }; + let params = KMeansParams::new(centroids, max_iterations, redos, distance_type); let data = FixedSizeListArray::try_new_from_values(data, dimension as i32)?; let model = KMeans::new_with_params(&data, k, ¶ms)?; Ok(model.centroids.clone()) diff --git a/rust/lance-index/src/vector/pq/builder.rs b/rust/lance-index/src/vector/pq/builder.rs index a7281c0bab..48cfad5d65 100644 --- a/rust/lance-index/src/vector/pq/builder.rs +++ b/rust/lance-index/src/vector/pq/builder.rs @@ -111,6 +111,7 @@ impl PQBuildParams { .map(|sub_vec| { let rng = rand::rngs::SmallRng::from_entropy(); train_kmeans::( + None, &sub_vec, sub_vector_dimension, num_centroids, diff --git a/rust/lance-linalg/src/kmeans.rs b/rust/lance-linalg/src/kmeans.rs index 07e9018450..7b5818d132 100644 --- a/rust/lance-linalg/src/kmeans.rs +++ b/rust/lance-linalg/src/kmeans.rs @@ -41,10 +41,10 @@ use crate::{ use crate::{Error, Result}; /// KMean initialization method. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] pub enum KMeanInit { Random, - KMeanPlusPlus, + Incremental(Arc), } /// KMean Training Parameters @@ -80,11 +80,21 @@ impl Default for KMeansParams { } impl KMeansParams { - /// Create a new KMeansParams with cosine distance. - #[allow(dead_code)] - fn cosine() -> Self { + pub fn new( + centroids: Option>, + max_iters: u32, + redos: usize, + distance_type: DistanceType, + ) -> Self { + let init = match centroids { + Some(centroids) => KMeanInit::Incremental(centroids), + None => KMeanInit::Random, + }; Self { - distance_type: DistanceType::Cosine, + max_iters, + redos, + distance_type, + init, ..Default::default() } } @@ -474,7 +484,7 @@ impl KMeans { // TODO: use seed for Rng. let rng = SmallRng::from_entropy(); for redo in 1..=params.redos { - let mut kmeans: Self = match params.init { + let mut kmeans: Self = match ¶ms.init { KMeanInit::Random => Self::init_random::( data.values(), dimension, @@ -482,9 +492,12 @@ impl KMeans { rng.clone(), params.distance_type, ), - KMeanInit::KMeanPlusPlus => { - unimplemented!() - } + KMeanInit::Incremental(centroids) => Self::with_centroids( + centroids.values().clone(), + dimension, + params.distance_type, + f64::MAX, + ), }; let mut loss = f64::MAX; diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 3cf5df3c2a..7a2f91d724 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -175,7 +175,7 @@ impl IvfIndexBuilder sub_index_params: None, _temp_dir: temp_dir, temp_dir: temp_dir_path, - ivf: Some(ivf_index.ivf_model()), + ivf: Some(ivf_index.ivf_model().clone()), quantizer: Some(ivf_index.quantizer().try_into()?), shuffle_reader: None, partition_sizes: Vec::new(), @@ -186,9 +186,8 @@ impl IvfIndexBuilder // build the index with the all data in the dataset, pub async fn build(&mut self) -> Result<()> { // step 1. train IVF & quantizer - if self.ivf.is_none() { - self.with_ivf(self.load_or_build_ivf().await?); - } + self.with_ivf(self.load_or_build_ivf().await?); + if self.quantizer.is_none() { self.with_quantizer(self.load_or_build_quantizer().await?); } @@ -285,10 +284,32 @@ impl IvfIndexBuilder "IVF build params not set", location!(), ))?; - let dim = utils::get_vector_dim(dataset.schema(), &self.column)?; - super::build_ivf_model(dataset, &self.column, dim, self.distance_type, ivf_params).await - // TODO: load ivf model + let dim = utils::get_vector_dim(dataset.schema(), &self.column)?; + match &self.ivf { + Some(ivf) => { + if self.existing_indices.is_empty() { + return Ok(ivf.clone()); + } else { + // retrain the IVF model with the existing indices + let mut ivf_params = ivf_params.clone(); + ivf_params.retrain = true; + + super::build_ivf_model( + dataset, + &self.column, + dim, + self.distance_type, + &ivf_params, + ) + .await + } + } + None => { + super::build_ivf_model(dataset, &self.column, dim, self.distance_type, ivf_params) + .await + } + } } async fn load_or_build_quantizer(&self) -> Result { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 2b2608b070..86f846937c 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -70,6 +70,7 @@ use lance_linalg::{ distance::Normalize, kernels::{normalize_arrow, normalize_fsl}, }; +use lazy_static::lazy_static; use log::info; use object_store::path::Path; use rand::{rngs::SmallRng, SeedableRng}; @@ -93,6 +94,13 @@ use crate::{ session::Session, }; +lazy_static! { + static ref AVG_LOSS_RETRAIN_THRESHOLD: f64 = std::env::var("LANCE_AVG_LOSS_RETRAIN_THRESHOLD") + .unwrap_or("1.1".to_string()) + .parse() + .expect("LANCE_AVG_LOSS_RETRAIN_THRESHOLD must be a float number"); +} + pub mod builder; pub mod io; pub mod v2; @@ -358,13 +366,34 @@ pub(crate) async fn optimize_vector_indices_v2( let num_partitions = ivf_model.num_partitions(); let index_type = existing_indices[0].sub_index_type(); + let original_avg_loss = ivf_model.avg_loss().unwrap_or_default(); + let total_loss = existing_indices + .iter() + .map(|idx| idx.ivf_model().loss.unwrap_or_default()) + .sum::(); + let total_rows = existing_indices + .iter() + .map(|idx| idx.ivf_model().num_rows()) + .sum::(); + let avg_loss = total_loss / total_rows as f64; + + const AVG_LOSS_THRESHOLD: f64 = 1.1; + let mut num_indices_to_merge = options.num_indices_to_merge; + if avg_loss >= original_avg_loss * AVG_LOSS_THRESHOLD { + info!( + "average loss {} of the indices is too high (> {} * {}), retrain the index", + avg_loss, original_avg_loss, AVG_LOSS_THRESHOLD + ); + num_indices_to_merge = existing_indices.len(); + } + let temp_dir = tempfile::tempdir()?; let temp_dir = temp_dir.path().to_str().unwrap().into(); let shuffler = Box::new(IvfShuffler::new(temp_dir, num_partitions)); - let start_pos = if options.num_indices_to_merge > existing_indices.len() { + let start_pos = if num_indices_to_merge > existing_indices.len() { 0 } else { - existing_indices.len() - options.num_indices_to_merge + existing_indices.len() - num_indices_to_merge }; let indices_to_merge = existing_indices[start_pos..].to_vec(); let merged_num = indices_to_merge.len(); @@ -379,7 +408,7 @@ pub(crate) async fn optimize_vector_indices_v2( shuffler, (), )? - .with_ivf(ivf_model) + .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) .shuffle_data(unindexed) @@ -397,7 +426,7 @@ pub(crate) async fn optimize_vector_indices_v2( shuffler, (), )? - .with_ivf(ivf_model) + .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) .shuffle_data(unindexed) @@ -418,7 +447,7 @@ pub(crate) async fn optimize_vector_indices_v2( // TODO: get the HNSW parameters from the existing indices HnswBuildParams::default(), )? - .with_ivf(ivf_model) + .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) .shuffle_data(unindexed) @@ -439,7 +468,7 @@ pub(crate) async fn optimize_vector_indices_v2( // TODO: get the HNSW parameters from the existing indices HnswBuildParams::default(), )? - .with_ivf(ivf_model) + .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) .shuffle_data(unindexed) @@ -941,8 +970,8 @@ impl VectorIndex for IVFIndex { }) } - fn ivf_model(&self) -> IvfModel { - self.ivf.clone() + fn ivf_model(&self) -> &IvfModel { + &self.ivf } fn quantizer(&self) -> Quantizer { @@ -1124,7 +1153,9 @@ pub async fn build_ivf_model( metric_type: MetricType, params: &IvfBuildParams, ) -> Result { - if let Some(centroids) = params.centroids.as_ref() { + let centroids = params.centroids.clone(); + if centroids.is_some() && !params.retrain { + let centroids = centroids.unwrap(); info!("Pre-computed IVF centroids is provided, skip IVF training"); if centroids.values().len() != params.num_partitions * dim { return Err(Error::Index { @@ -1162,7 +1193,7 @@ pub async fn build_ivf_model( info!("Start to train IVF model"); let start = std::time::Instant::now(); - let ivf = train_ivf_model(&training_data, mt, params).await?; + let ivf = train_ivf_model(centroids, &training_data, mt, params).await?; info!( "Trained IVF model in {:02} seconds", start.elapsed().as_secs_f32() @@ -1660,6 +1691,7 @@ async fn write_ivf_hnsw_file( } async fn do_train_ivf_model( + centroids: Option>, data: &[T::Native], dimension: usize, metric_type: MetricType, @@ -1672,6 +1704,7 @@ where let rng = SmallRng::from_entropy(); const REDOS: usize = 1; let centroids = lance_index::vector::kmeans::train_kmeans::( + centroids, data, dimension, params.num_partitions, @@ -1689,6 +1722,7 @@ where /// Train IVF partitions using kmeans. async fn train_ivf_model( + centroids: Option>, data: &FixedSizeListArray, distance_type: DistanceType, params: &IvfBuildParams, @@ -1702,6 +1736,7 @@ async fn train_ivf_model( match (values.data_type(), distance_type) { (DataType::Float16, _) => { do_train_ivf_model::( + centroids, values.as_primitive::().values(), dim, distance_type, @@ -1711,6 +1746,7 @@ async fn train_ivf_model( } (DataType::Float32, _) => { do_train_ivf_model::( + centroids, values.as_primitive::().values(), dim, distance_type, @@ -1720,6 +1756,7 @@ async fn train_ivf_model( } (DataType::Float64, _) => { do_train_ivf_model::( + centroids, values.as_primitive::().values(), dim, distance_type, @@ -1729,6 +1766,7 @@ async fn train_ivf_model( } (DataType::UInt8, DistanceType::Hamming) => { do_train_ivf_model::( + centroids, values.as_primitive::().values(), dim, distance_type, diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index e72153f10e..1d006ce413 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -523,8 +523,8 @@ impl VectorIndex for IVFInd } } - fn ivf_model(&self) -> IvfModel { - self.ivf.clone() + fn ivf_model(&self) -> &IvfModel { + &self.ivf } fn quantizer(&self) -> Quantizer { diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index a04cfd7018..06c8c13361 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -410,7 +410,7 @@ impl VectorIndex for PQIndex { Ok(()) } - fn ivf_model(&self) -> IvfModel { + fn ivf_model(&self) -> &IvfModel { unimplemented!("only for IVF") } fn quantizer(&self) -> Quantizer { From 83c093d2287ff3c24dd36b6c688bffa014ddbb3b Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 28 Feb 2025 18:16:45 +0800 Subject: [PATCH 04/22] fix Signed-off-by: BubbleCal --- rust/lance/src/index/vector/fixture_test.rs | 2 +- rust/lance/src/session/index_extension.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index d13162da38..1e3eedbdb7 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -147,7 +147,7 @@ mod test { unimplemented!("only for SubIndex") } - fn ivf_model(&self) -> IvfModel { + fn ivf_model(&self) -> &IvfModel { unimplemented!("only for IVF") } fn quantizer(&self) -> Quantizer { diff --git a/rust/lance/src/session/index_extension.rs b/rust/lance/src/session/index_extension.rs index 3e7369c1c0..d9cfac9041 100644 --- a/rust/lance/src/session/index_extension.rs +++ b/rust/lance/src/session/index_extension.rs @@ -174,7 +174,7 @@ mod test { unimplemented!() } - fn ivf_model(&self) -> IvfModel { + fn ivf_model(&self) -> &IvfModel { unimplemented!() } fn quantizer(&self) -> Quantizer { From 2a3b358ae99d9940ccc6245814ce4c2b69597524 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 28 Feb 2025 18:23:53 +0800 Subject: [PATCH 05/22] fix Signed-off-by: BubbleCal --- python/src/indices.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/src/indices.rs b/python/src/indices.rs index d488a8fafa..e07dab1678 100644 --- a/python/src/indices.rs +++ b/python/src/indices.rs @@ -138,6 +138,7 @@ fn train_pq_model( centroids: Some(ivf_centroids), offsets: vec![], lengths: vec![], + loss: None, }; let codebook = RT.block_on( Some(py), @@ -357,6 +358,7 @@ pub fn load_shuffled_vectors( centroids: Some(ivf_centroids), offsets: vec![], lengths: vec![], + loss: None, }; let codebook = pq_codebook.0; From aa3be2a0bd22687a95cf171d56befcd648b7bd2e Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 6 Mar 2025 15:19:21 +0800 Subject: [PATCH 06/22] fix Signed-off-by: BubbleCal --- rust/lance/src/index/vector/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 5f3ad0ec29..be77cf6477 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -296,7 +296,7 @@ impl IvfIndexBuilder match &self.ivf { Some(ivf) => { if self.existing_indices.is_empty() { - return Ok(ivf.clone()); + Ok(ivf.clone()) } else { // retrain the IVF model with the existing indices let mut ivf_params = ivf_params.clone(); From a3b8dd3458355c64df3e1771a96b13f5b1c6b684 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 6 Mar 2025 15:29:41 +0800 Subject: [PATCH 07/22] fmt Signed-off-by: BubbleCal --- rust/lance/src/index/vector/ivf/v2.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index dd17f94b87..217eb94b2f 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -803,7 +803,6 @@ mod tests { let gt_set = gt.iter().map(|r| r.1).collect::>(); let recall = row_ids.intersection(>_set).count() as f32 / k as f32; - println!("recall: {}", recall); assert!( recall >= recall_requirement, "recall: {}\n results: {:?}\n\ngt: {:?}", From 5ca3477def14c71a44747effbebc9a4c7231d277 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 6 Mar 2025 17:18:28 +0800 Subject: [PATCH 08/22] fix Signed-off-by: BubbleCal --- protos/index.proto | 3 +++ rust/lance-index/src/vector/ivf/storage.rs | 8 +++++--- rust/lance-index/src/vector/kmeans.rs | 6 +++--- rust/lance-index/src/vector/pq/builder.rs | 1 + rust/lance/src/index/vector/builder.rs | 12 ++++++------ rust/lance/src/index/vector/fixture_test.rs | 2 +- rust/lance/src/index/vector/ivf.rs | 18 +++++++++--------- rust/lance/src/index/vector/pq.rs | 2 +- 8 files changed, 29 insertions(+), 23 deletions(-) diff --git a/protos/index.proto b/protos/index.proto index 0db16566d2..178c0b1c44 100644 --- a/protos/index.proto +++ b/protos/index.proto @@ -66,6 +66,9 @@ message IVF { // Tensor of centroids. `num_partitions * dimension` of float32s. Tensor centroids_tensor = 4; + + // KMeans loss. + double loss = 5; } // Product Quantization. diff --git a/rust/lance-index/src/vector/ivf/storage.rs b/rust/lance-index/src/vector/ivf/storage.rs index 2bace2b3f4..ce016cf07b 100644 --- a/rust/lance-index/src/vector/ivf/storage.rs +++ b/rust/lance-index/src/vector/ivf/storage.rs @@ -60,12 +60,12 @@ impl IvfModel { } } - pub fn new(centroids: FixedSizeListArray) -> Self { + pub fn new(centroids: FixedSizeListArray, loss: Option) -> Self { Self { centroids: Some(centroids), offsets: vec![], lengths: vec![], - loss: None, + loss, } } @@ -180,6 +180,7 @@ impl TryFrom<&IvfModel> for PbIvf { lengths, offsets: ivf.offsets.iter().map(|x| *x as u64).collect(), centroids_tensor: ivf.centroids.as_ref().map(|c| c.try_into()).transpose()?, + loss: ivf.loss.unwrap_or_default(), }) } } @@ -228,7 +229,7 @@ impl TryFrom for IvfModel { centroids, offsets, lengths: proto.lengths, - loss: None, + loss: Some(proto.loss), }) } } @@ -310,6 +311,7 @@ mod tests { lengths: vec![2, 2], offsets: vec![0, 2], centroids_tensor: None, + loss: 0.0, }; let ivf = IvfModel::try_from(pb_ivf).unwrap(); diff --git a/rust/lance-index/src/vector/kmeans.rs b/rust/lance-index/src/vector/kmeans.rs index 66d4f84189..8f6e451bdd 100644 --- a/rust/lance-index/src/vector/kmeans.rs +++ b/rust/lance-index/src/vector/kmeans.rs @@ -3,7 +3,7 @@ use std::sync::Arc; -use arrow_array::{types::ArrowPrimitiveType, ArrayRef, FixedSizeListArray, PrimitiveArray}; +use arrow_array::{types::ArrowPrimitiveType, FixedSizeListArray, PrimitiveArray}; use lance_arrow::FixedSizeListArrayExt; use log::info; use rand::{seq::IteratorRandom, Rng}; @@ -27,7 +27,7 @@ pub fn train_kmeans( mut rng: impl Rng, distance_type: DistanceType, sample_rate: usize, -) -> Result +) -> Result where T::Native: Dot + L2 + Normalize, PrimitiveArray: From>, @@ -63,5 +63,5 @@ where let params = KMeansParams::new(centroids, max_iterations, redos, distance_type); let data = FixedSizeListArray::try_new_from_values(data, dimension as i32)?; let model = KMeans::new_with_params(&data, k, ¶ms)?; - Ok(model.centroids.clone()) + Ok(model) } diff --git a/rust/lance-index/src/vector/pq/builder.rs b/rust/lance-index/src/vector/pq/builder.rs index 48cfad5d65..dd4fc63c2d 100644 --- a/rust/lance-index/src/vector/pq/builder.rs +++ b/rust/lance-index/src/vector/pq/builder.rs @@ -121,6 +121,7 @@ impl PQBuildParams { distance_type, self.sample_rate, ) + .map(|kmeans| kmeans.centroids) }) .collect::>>()?; let mut codebook_builder = PrimitiveBuilder::::with_capacity(num_centroids * dimension); diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index be77cf6477..00d368cd9a 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -287,10 +287,6 @@ impl IvfIndexBuilder "dataset not set before loading or building IVF", location!(), ))?; - let ivf_params = self.ivf_params.as_ref().ok_or(Error::invalid_input( - "IVF build params not set", - location!(), - ))?; let dim = utils::get_vector_dim(dataset.schema(), &self.column)?; match &self.ivf { @@ -299,7 +295,7 @@ impl IvfIndexBuilder Ok(ivf.clone()) } else { // retrain the IVF model with the existing indices - let mut ivf_params = ivf_params.clone(); + let mut ivf_params = IvfBuildParams::new(ivf.num_partitions()); ivf_params.retrain = true; super::build_ivf_model( @@ -313,6 +309,10 @@ impl IvfIndexBuilder } } None => { + let ivf_params = self.ivf_params.as_ref().ok_or(Error::invalid_input( + "IVF build params not set", + location!(), + ))?; super::build_ivf_model(dataset, &self.column, dim, self.distance_type, ivf_params) .await } @@ -675,7 +675,7 @@ impl IvfIndexBuilder // maintain the IVF partitions let mut storage_ivf = IvfModel::empty(); - let mut index_ivf = IvfModel::new(ivf.centroids.clone().unwrap()); + let mut index_ivf = IvfModel::new(ivf.centroids.clone().unwrap(), ivf.loss); let mut partition_index_metadata = Vec::with_capacity(partition_sizes.len()); let obj_store = Arc::new(ObjectStore::local()); let scheduler_config = SchedulerConfig::max_bandwidth(&obj_store); diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index 1e3eedbdb7..2b34926839 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -169,7 +169,7 @@ mod test { async fn test_ivf_residual_handling() { let centroids = Float32Array::from_iter(vec![1.0, 1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0]); let centroids = FixedSizeListArray::try_new_from_values(centroids, 2).unwrap(); - let mut ivf = IvfModel::new(centroids); + let mut ivf = IvfModel::new(centroids, None); // Add 4 partitions for _ in 0..4 { ivf.add_partition(0); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 86f846937c..10a0eb4392 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -380,7 +380,7 @@ pub(crate) async fn optimize_vector_indices_v2( const AVG_LOSS_THRESHOLD: f64 = 1.1; let mut num_indices_to_merge = options.num_indices_to_merge; if avg_loss >= original_avg_loss * AVG_LOSS_THRESHOLD { - info!( + println!( "average loss {} of the indices is too high (> {} * {}), retrain the index", avg_loss, original_avg_loss, AVG_LOSS_THRESHOLD ); @@ -532,7 +532,7 @@ async fn optimize_ivf_pq_indices( None => None, }; - let mut ivf_mut = IvfModel::new(first_idx.ivf.centroids.clone().unwrap()); + let mut ivf_mut = IvfModel::new(first_idx.ivf.centroids.clone().unwrap(), first_idx.ivf.loss); let start_pos = existing_indices .len() @@ -609,7 +609,7 @@ async fn optimize_ivf_hnsw_indices( None => None, }; - let mut ivf_mut = IvfModel::new(first_idx.ivf.centroids.clone().unwrap()); + let mut ivf_mut = IvfModel::new(first_idx.ivf.centroids.clone().unwrap(), first_idx.ivf.loss); let start_pos = if options.num_indices_to_merge > existing_indices.len() { 0 @@ -1167,7 +1167,7 @@ pub async fn build_ivf_model( location: location!(), }); } - return Ok(IvfModel::new(centroids.as_ref().clone())); + return Ok(IvfModel::new(centroids.as_ref().clone(), None)); } let sample_size_hint = params.num_partitions * params.sample_rate; @@ -1703,7 +1703,7 @@ where { let rng = SmallRng::from_entropy(); const REDOS: usize = 1; - let centroids = lance_index::vector::kmeans::train_kmeans::( + let kmeans = lance_index::vector::kmeans::train_kmeans::( centroids, data, dimension, @@ -1714,10 +1714,10 @@ where metric_type, params.sample_rate, )?; - Ok(IvfModel::new(FixedSizeListArray::try_new_from_values( - centroids, - dimension as i32, - )?)) + Ok(IvfModel::new( + FixedSizeListArray::try_new_from_values(kmeans.centroids, dimension as i32)?, + Some(kmeans.loss), + )) } /// Train IVF partitions using kmeans. diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index 06c8c13361..a96e8e4720 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -610,7 +610,7 @@ mod tests { let centroids = generate_random_array_with_range::(4 * DIM, -1.0..1.0); let fsl = FixedSizeListArray::try_new_from_values(centroids, DIM as i32).unwrap(); - let ivf = IvfModel::new(fsl); + let ivf = IvfModel::new(fsl, None); let params = PQBuildParams::new(16, 8); let pq = build_pq_model(&dataset, "vector", DIM, MetricType::L2, ¶ms, Some(&ivf)) .await From fc02fe9d93a15eb0ba151e22937d4e2524e3dd23 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 6 Mar 2025 17:22:48 +0800 Subject: [PATCH 09/22] log Signed-off-by: BubbleCal --- rust/lance/src/index/vector/ivf.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 10a0eb4392..a3d8fe1362 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -380,7 +380,7 @@ pub(crate) async fn optimize_vector_indices_v2( const AVG_LOSS_THRESHOLD: f64 = 1.1; let mut num_indices_to_merge = options.num_indices_to_merge; if avg_loss >= original_avg_loss * AVG_LOSS_THRESHOLD { - println!( + info!( "average loss {} of the indices is too high (> {} * {}), retrain the index", avg_loss, original_avg_loss, AVG_LOSS_THRESHOLD ); From 5eb8850d72f011efdd4efdd0d5b3d4915a9aefa2 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 6 Mar 2025 18:25:50 +0800 Subject: [PATCH 10/22] fix Signed-off-by: BubbleCal --- protos/index.proto | 2 +- rust/lance-index/src/vector/ivf/storage.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/protos/index.proto b/protos/index.proto index 178c0b1c44..e7eb7f4818 100644 --- a/protos/index.proto +++ b/protos/index.proto @@ -68,7 +68,7 @@ message IVF { Tensor centroids_tensor = 4; // KMeans loss. - double loss = 5; + optional double loss = 5; } // Product Quantization. diff --git a/rust/lance-index/src/vector/ivf/storage.rs b/rust/lance-index/src/vector/ivf/storage.rs index ce016cf07b..745d7cceeb 100644 --- a/rust/lance-index/src/vector/ivf/storage.rs +++ b/rust/lance-index/src/vector/ivf/storage.rs @@ -180,7 +180,7 @@ impl TryFrom<&IvfModel> for PbIvf { lengths, offsets: ivf.offsets.iter().map(|x| *x as u64).collect(), centroids_tensor: ivf.centroids.as_ref().map(|c| c.try_into()).transpose()?, - loss: ivf.loss.unwrap_or_default(), + loss: ivf.loss, }) } } @@ -229,7 +229,7 @@ impl TryFrom for IvfModel { centroids, offsets, lengths: proto.lengths, - loss: Some(proto.loss), + loss: proto.loss, }) } } @@ -311,7 +311,7 @@ mod tests { lengths: vec![2, 2], offsets: vec![0, 2], centroids_tensor: None, - loss: 0.0, + loss: None, }; let ivf = IvfModel::try_from(pb_ivf).unwrap(); From 8dbc1410ffee296215a0120803b449f2749d1151 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 7 Mar 2025 15:19:39 +0800 Subject: [PATCH 11/22] fix Signed-off-by: BubbleCal --- rust/lance-index/src/vector.rs | 2 + rust/lance-index/src/vector/hnsw/index.rs | 6 + rust/lance-index/src/vector/ivf/storage.rs | 4 +- rust/lance-index/src/vector/ivf/transform.rs | 9 +- rust/lance-index/src/vector/storage.rs | 4 + rust/lance-linalg/src/kmeans.rs | 12 +- rust/lance/src/index/vector/fixture_test.rs | 4 + rust/lance/src/index/vector/ivf.rs | 29 +- rust/lance/src/index/vector/ivf/v2.rs | 292 ++++++++++++++----- rust/lance/src/index/vector/pq.rs | 6 + rust/lance/src/session/index_extension.rs | 4 + 11 files changed, 275 insertions(+), 97 deletions(-) diff --git a/rust/lance-index/src/vector.rs b/rust/lance-index/src/vector.rs index 093d331908..9f9fc20ae0 100644 --- a/rust/lance-index/src/vector.rs +++ b/rust/lance-index/src/vector.rs @@ -195,6 +195,8 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index { // for SubIndex only async fn to_batch_stream(&self, with_vector: bool) -> Result; + fn num_rows(&self) -> u64; + /// Return the IDs of rows in the index. fn row_ids(&self) -> Box + '_>; diff --git a/rust/lance-index/src/vector/hnsw/index.rs b/rust/lance-index/src/vector/hnsw/index.rs index 27a4a8e44a..f12be997a2 100644 --- a/rust/lance-index/src/vector/hnsw/index.rs +++ b/rust/lance-index/src/vector/hnsw/index.rs @@ -293,6 +293,12 @@ impl VectorIndex for HNSWIndex { Ok(Box::pin(stream)) } + fn num_rows(&self) -> u64 { + self.hnsw + .as_ref() + .map_or(0, |hnsw| hnsw.num_nodes(0) as u64) + } + fn row_ids(&self) -> Box + '_> { Box::new(self.storage.as_ref().unwrap().row_ids()) } diff --git a/rust/lance-index/src/vector/ivf/storage.rs b/rust/lance-index/src/vector/ivf/storage.rs index 745d7cceeb..5f626943f8 100644 --- a/rust/lance-index/src/vector/ivf/storage.rs +++ b/rust/lance-index/src/vector/ivf/storage.rs @@ -97,8 +97,8 @@ impl IvfModel { self.lengths.iter().map(|x| *x as u64).sum() } - pub fn avg_loss(&self) -> Option { - self.loss.map(|loss| loss / self.num_rows() as f64) + pub fn loss(&self) -> Option { + self.loss } /// Use the query vector to find `nprobes` closest partitions. diff --git a/rust/lance-index/src/vector/ivf/transform.rs b/rust/lance-index/src/vector/ivf/transform.rs index f1841940b9..9091640830 100644 --- a/rust/lance-index/src/vector/ivf/transform.rs +++ b/rust/lance-index/src/vector/ivf/transform.rs @@ -59,6 +59,7 @@ impl PartitionTransformer { pub(super) fn compute_partitions(&self, data: &FixedSizeListArray) -> UInt32Array { compute_partitions_arrow_array(&self.centroids, data, self.distance_type) .expect("failed to compute partitions") + .0 .into() } } @@ -91,9 +92,13 @@ impl Transformer for PartitionTransformer { location: location!(), })?; - let part_ids = self.compute_partitions(fsl); + let (part_ids, loss) = + compute_partitions_arrow_array(&self.centroids, fsl, self.distance_type)?; + let part_ids = UInt32Array::from(part_ids); let field = Field::new(PART_ID_COLUMN, part_ids.data_type().clone(), true); - Ok(batch.try_with_column(field, Arc::new(part_ids))?) + Ok(batch + .try_with_column(field, Arc::new(part_ids))? + .add_metadata("loss".to_owned(), loss.to_string())?) } } diff --git a/rust/lance-index/src/vector/storage.rs b/rust/lance-index/src/vector/storage.rs index ee740fa153..c7aba4d0dd 100644 --- a/rust/lance-index/src/vector/storage.rs +++ b/rust/lance-index/src/vector/storage.rs @@ -254,6 +254,10 @@ impl IvfQuantizationStorage { }) } + pub fn num_rows(&self) -> u64 { + self.reader.num_rows() + } + pub fn quantizer(&self) -> Result { let metadata = serde_json::from_str(&self.metadata[0])?; Q::from_metadata(&metadata, self.distance_type) diff --git a/rust/lance-linalg/src/kmeans.rs b/rust/lance-linalg/src/kmeans.rs index 7b5818d132..59afbecadb 100644 --- a/rust/lance-linalg/src/kmeans.rs +++ b/rust/lance-linalg/src/kmeans.rs @@ -695,7 +695,7 @@ pub fn compute_partitions_arrow_array( centroids: &FixedSizeListArray, vectors: &FixedSizeListArray, distance_type: DistanceType, -) -> Result>> { +) -> Result<(Vec>, f64)> { if centroids.value_length() != vectors.value_length() { return Err(ArrowError::InvalidArgumentError( "Centroids and vectors have different dimensions".to_string(), @@ -749,18 +749,17 @@ pub fn compute_partitions>( vectors: &PrimitiveArray, dimension: impl AsPrimitive, distance_type: DistanceType, -) -> Vec> +) -> (Vec>, f64) where T::Native: Num, { let dimension = dimension.as_(); - let (membership, _) = K::compute_membership_and_loss( + K::compute_membership_and_loss( centroids.values(), vectors.values(), dimension, distance_type, - ); - membership + ) } #[inline] @@ -826,7 +825,7 @@ mod tests { ) }) .collect::>(); - let actual = compute_partitions::>( + let (actual, _) = compute_partitions::>( ¢roids, &data, DIM, @@ -867,6 +866,7 @@ mod tests { DIM, DistanceType::L2, ) + .0 .iter() .for_each(|cd| { assert!(cd.is_none()); diff --git a/rust/lance/src/index/vector/fixture_test.rs b/rust/lance/src/index/vector/fixture_test.rs index 2b34926839..7484842248 100644 --- a/rust/lance/src/index/vector/fixture_test.rs +++ b/rust/lance/src/index/vector/fixture_test.rs @@ -135,6 +135,10 @@ mod test { Ok(Box::new(self.clone())) } + fn num_rows(&self) -> u64 { + self.ret_val.num_rows() as u64 + } + fn row_ids(&self) -> Box> { todo!("this method is for only IVF_HNSW_* index"); } diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index a3d8fe1362..b77f788adc 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -71,7 +71,7 @@ use lance_linalg::{ kernels::{normalize_arrow, normalize_fsl}, }; use lazy_static::lazy_static; -use log::info; +use log::{info, warn}; use object_store::path::Path; use rand::{rngs::SmallRng, SeedableRng}; use roaring::RoaringBitmap; @@ -366,25 +366,28 @@ pub(crate) async fn optimize_vector_indices_v2( let num_partitions = ivf_model.num_partitions(); let index_type = existing_indices[0].sub_index_type(); - let original_avg_loss = ivf_model.avg_loss().unwrap_or_default(); + let original_avg_loss = ivf_model + .loss() + .map(|loss| loss / existing_indices[0].num_rows() as f64); let total_loss = existing_indices .iter() - .map(|idx| idx.ivf_model().loss.unwrap_or_default()) + .map(|idx| idx.ivf_model().loss().unwrap_or_default()) .sum::(); let total_rows = existing_indices .iter() - .map(|idx| idx.ivf_model().num_rows()) + .map(|idx| idx.num_rows()) .sum::(); let avg_loss = total_loss / total_rows as f64; - const AVG_LOSS_THRESHOLD: f64 = 1.1; let mut num_indices_to_merge = options.num_indices_to_merge; - if avg_loss >= original_avg_loss * AVG_LOSS_THRESHOLD { - info!( - "average loss {} of the indices is too high (> {} * {}), retrain the index", - avg_loss, original_avg_loss, AVG_LOSS_THRESHOLD - ); - num_indices_to_merge = existing_indices.len(); + if let Some(original_avg_loss) = original_avg_loss { + if avg_loss >= original_avg_loss * *AVG_LOSS_RETRAIN_THRESHOLD { + warn!( + "average loss {} of the indices is too high (> {} * {}), retrain the index", + avg_loss, original_avg_loss, *AVG_LOSS_RETRAIN_THRESHOLD + ); + num_indices_to_merge = existing_indices.len(); + } } let temp_dir = tempfile::tempdir()?; @@ -953,6 +956,10 @@ impl VectorIndex for IVFIndex { unimplemented!("this method is for only sub index") } + fn num_rows(&self) -> u64 { + self.ivf.num_rows() + } + fn row_ids(&self) -> Box> { todo!("this method is for only IVF_HNSW_* index"); } diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 217eb94b2f..7e04219e85 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -511,6 +511,10 @@ impl VectorIndex for IVFInd unimplemented!("this method is for only sub index"); } + fn num_rows(&self) -> u64 { + self.storage.num_rows() + } + fn row_ids(&self) -> Box + '_> { todo!("this method is for only IVF_HNSW_* index"); } @@ -571,22 +575,24 @@ pub type IvfHnswPqIndex = IVFIndex; #[cfg(test)] mod tests { use std::collections::HashSet; - use std::{collections::HashMap, ops::Range, sync::Arc}; + use std::{ops::Range, sync::Arc}; use all_asserts::{assert_ge, assert_lt}; use arrow::datatypes::{UInt64Type, UInt8Type}; use arrow::{array::AsArray, datatypes::Float32Type}; use arrow_array::{ - Array, ArrowPrimitiveType, FixedSizeListArray, ListArray, RecordBatch, RecordBatchIterator, - UInt64Array, + Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, ListArray, RecordBatch, + RecordBatchIterator, UInt64Array, }; use arrow_buffer::OffsetBuffer; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; use itertools::Itertools; use lance_arrow::FixedSizeListArrayExt; use lance_core::ROW_ID; + use lance_index::optimize::OptimizeOptions; use lance_index::vector::hnsw::builder::HnswBuildParams; + use lance_index::vector::ivf::storage::IvfModel; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::pq::PQBuildParams; use lance_index::vector::sq::builder::SQBuildParams; @@ -601,6 +607,8 @@ mod tests { use crate::dataset::optimize::{compact_files, CompactionOptions}; use crate::dataset::UpdateBuilder; + use crate::index::vector::ivf::AVG_LOSS_RETRAIN_THRESHOLD; + use crate::index::DatasetIndexInternalExt; use crate::{index::vector::VectorIndexParams, Dataset}; const DIM: usize = 32; @@ -612,87 +620,116 @@ mod tests { where T::Native: SampleUniform, { - let ids = Arc::new(UInt64Array::from_iter_values(0..1000)); - let vectors = generate_random_array_with_range::(1000 * DIM, range); - let metadata: HashMap = vec![("test".to_string(), "ivf_pq".to_string())] - .into_iter() - .collect(); - let data_type = vectors.data_type().clone(); - let schema: Arc<_> = Schema::new(vec![ - Field::new("id", DataType::UInt64, false), - Field::new( - "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", data_type.clone(), true)), - DIM as i32, - ), - true, - ), - ]) - .with_metadata(metadata) - .into(); - let mut fsl = FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap(); - if data_type != DataType::UInt8 { - fsl = lance_linalg::kernels::normalize_fsl(&fsl).unwrap(); - } - let array = Arc::new(fsl); - let batch = RecordBatch::try_new(schema.clone(), vec![ids, array.clone()]).unwrap(); - - let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); + let (batch, schema) = generate_batch::(500, None, range, false); + let vectors = batch.column_by_name("vector").unwrap().clone(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); let dataset = Dataset::write(batches, test_uri, None).await.unwrap(); - (dataset, array) + (dataset, Arc::new(vectors.as_fixed_size_list().clone())) } async fn generate_multivec_test_dataset( test_uri: &str, range: Range, ) -> (Dataset, Arc) + where + T::Native: SampleUniform, + { + let (batch, schema) = generate_batch::(500, None, range, true); + let vectors = batch.column_by_name("vector").unwrap().clone(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + let dataset = Dataset::write(batches, test_uri, None).await.unwrap(); + (dataset, Arc::new(vectors.as_list::().clone())) + } + + async fn append_dataset( + dataset: &mut Dataset, + num_rows: usize, + range: Range, + ) -> ArrayRef + where + T::Native: SampleUniform, + { + let is_multivector = matches!( + dataset.schema().field("vector").unwrap().data_type(), + DataType::List(_) + ); + let row_count = dataset.count_all_rows().await.unwrap(); + let (batch, schema) = + generate_batch::(num_rows, Some(row_count as u64), range, is_multivector); + let vectors = batch["vector"].clone(); + let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema); + dataset.append(batches, None).await.unwrap(); + vectors + } + + fn generate_batch( + num_rows: usize, + start_id: Option, + range: Range, + is_multivector: bool, + ) -> (RecordBatch, SchemaRef) where T::Native: SampleUniform, { const VECTOR_NUM_PER_ROW: usize = 5; - let vectors = generate_random_array_with_range::(1000 * VECTOR_NUM_PER_ROW * DIM, range); - let metadata: HashMap = vec![("test".to_string(), "ivf_pq".to_string())] - .into_iter() - .collect(); + let start_id = start_id.unwrap_or(0); + let ids = Arc::new(UInt64Array::from_iter_values( + start_id..start_id + num_rows as u64, + )); + let total_floats = match is_multivector { + true => num_rows * VECTOR_NUM_PER_ROW * DIM, + false => num_rows * DIM, + }; + let vectors = generate_random_array_with_range::(total_floats, range); let data_type = vectors.data_type().clone(); - let schema: Arc<_> = Schema::new(vec![Field::new( - "vector", - DataType::List(Arc::new(Field::new( - "item", - DataType::FixedSizeList( - Arc::new(Field::new("item", data_type.clone(), true)), - DIM as i32, - ), - true, - ))), - true, - )]) - .with_metadata(metadata) - .into(); + let mut fields = vec![Field::new("id", DataType::UInt64, false)]; + let mut arrays: Vec = vec![ids]; let mut fsl = FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap(); if data_type != DataType::UInt8 { fsl = lance_linalg::kernels::normalize_fsl(&fsl).unwrap(); } - - let array = Arc::new(ListArray::new( - Arc::new(Field::new( - "item", + if is_multivector { + fields.push(Field::new( + "vector", + DataType::List(Arc::new(Field::new( + "item", + DataType::FixedSizeList( + Arc::new(Field::new("item", data_type.clone(), true)), + DIM as i32, + ), + true, + ))), + true, + )); + let array = Arc::new(ListArray::new( + Arc::new(Field::new( + "item", + DataType::FixedSizeList( + Arc::new(Field::new("item", data_type.clone(), true)), + DIM as i32, + ), + true, + )), + OffsetBuffer::from_lengths(std::iter::repeat(VECTOR_NUM_PER_ROW).take(num_rows)), + Arc::new(fsl), + None, + )); + arrays.push(array); + } else { + fields.push(Field::new( + "vector", DataType::FixedSizeList( Arc::new(Field::new("item", data_type.clone(), true)), DIM as i32, ), true, - )), - OffsetBuffer::from_lengths(std::iter::repeat(VECTOR_NUM_PER_ROW).take(1000)), - Arc::new(fsl), - None, - )); - let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()]).unwrap(); - - let batches = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone()); - let dataset = Dataset::write(batches, test_uri, None).await.unwrap(); - (dataset, array) + )); + let array = Arc::new(fsl); + arrays.push(array); + } + let schema: Arc<_> = Schema::new(fields).into(); + let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + (batch, schema) } #[allow(dead_code)] @@ -842,10 +879,10 @@ mod tests { let query = vectors.value(0); // delete half rows to trigger compact - dataset.delete("id < 500").await.unwrap(); + dataset.delete("id < 250").await.unwrap(); // update the other half rows let update_result = UpdateBuilder::new(Arc::new(dataset)) - .update_where("id >= 500 and id<600") + .update_where("id >= 250 and id<300") .unwrap() .set("id", "500+id") .unwrap() @@ -858,14 +895,14 @@ mod tests { .await .unwrap(); let num_rows = dataset.count_rows(None).await.unwrap(); - assert_eq!(num_rows, 500); + assert_eq!(num_rows, 250); compact_files(&mut dataset, CompactionOptions::default(), None) .await .unwrap(); // query again, the result should not include the deleted row let result = dataset .scan() - .nearest(vector_column, query.as_primitive::(), 500) + .nearest(vector_column, query.as_primitive::(), 250) .unwrap() .nprobs(nlist) .with_row_id() @@ -873,12 +910,109 @@ mod tests { .await .unwrap(); let row_ids = result["id"].as_primitive::(); - assert_eq!(row_ids.len(), 500); + assert_eq!(row_ids.len(), 250); row_ids.values().iter().for_each(|id| { - assert!(*id >= 600); + assert!(*id >= 300); }); } + async fn test_optimize_strategy(params: VectorIndexParams) { + match params.metric_type { + DistanceType::Hamming => { + test_optimize_strategy_impl::(params, 0..2).await; + } + _ => { + test_optimize_strategy_impl::(params, 0.0..1.0).await; + } + } + } + + async fn test_optimize_strategy_impl( + params: VectorIndexParams, + range: Range, + ) where + T::Native: SampleUniform + std::ops::Add, + { + let test_dir = tempdir().unwrap(); + let test_uri = test_dir.path().to_str().unwrap(); + let (mut dataset, _) = generate_test_dataset::(test_uri, range.clone()).await; + + let vector_column = "vector"; + dataset + .create_index(&[vector_column], IndexType::Vector, None, ¶ms, true) + .await + .unwrap(); + + async fn get_ivf_models(dataset: &Dataset) -> Vec { + let indices = dataset.load_indices_by_name("vector_idx").await.unwrap(); + let mut ivf_models = vec![]; + for idx in indices { + let index = dataset + .open_vector_index("vector", idx.uuid.to_string().as_str()) + .await + .unwrap(); + ivf_models.push(index.ivf_model().clone()); + } + ivf_models + } + + async fn get_avg_loss(dataset: &Dataset) -> f64 { + let ivf_models = get_ivf_models(dataset).await; + let mut loss = 0.0; + for ivf in ivf_models { + loss += ivf.loss.unwrap_or_default(); + } + let num_rows = dataset.count_rows(None).await.unwrap(); + loss / num_rows as f64 + } + + let original_ivf = get_ivf_models(&dataset).await; + let original_avg_loss = get_avg_loss(&dataset).await; + let original_ivf = &original_ivf[0]; + let mut count = 0; + // append more rows and make delta index until hitting the retrain threshold + loop { + let range = range.start..range.end + range.end + range.end + range.end + range.end; + append_dataset::(&mut dataset, 500, range).await; + dataset + .optimize_indices(&OptimizeOptions { + num_indices_to_merge: 0, + index_names: None, + }) + .await + .unwrap(); + count += 1; + + let new_avg_loss = get_avg_loss(&dataset).await; + if new_avg_loss / original_avg_loss >= *AVG_LOSS_RETRAIN_THRESHOLD { + break; + } + + // all delta indices should have the same centroids as the original index + let ivf_models = get_ivf_models(&dataset).await; + assert_eq!(ivf_models.len(), count + 1); + for ivf in ivf_models { + assert_eq!(original_ivf.centroids, ivf.centroids); + } + } + + // this optimize would merge all indices and retrain the IVF + dataset + .optimize_indices(&OptimizeOptions { + num_indices_to_merge: 0, + index_names: None, + }) + .await + .unwrap(); + let stats = dataset.index_statistics("vector_idx").await.unwrap(); + let stats = serde_json::to_value(stats).unwrap(); + assert_eq!(stats["num_indices"], 1); + + let ivf_models = get_ivf_models(&dataset).await; + let ivf = &ivf_models[0]; + assert_ne!(original_ivf.centroids, ivf.centroids); + } + #[tokio::test] async fn test_flat_knn() { test_distance_range(None, 4).await; @@ -901,7 +1035,8 @@ mod tests { test_index_multivec(params.clone(), nlist, recall_requirement).await; } test_distance_range(Some(params.clone()), nlist).await; - test_remap(params, nlist).await; + test_remap(params.clone(), nlist).await; + test_optimize_strategy(params).await; } #[rstest] @@ -945,7 +1080,8 @@ mod tests { test_index_multivec(params.clone(), nlist, recall_requirement).await; } test_distance_range(Some(params.clone()), nlist).await; - test_remap(params, nlist).await; + test_remap(params.clone(), nlist).await; + test_optimize_strategy(params).await; } #[rstest] @@ -967,7 +1103,8 @@ mod tests { if distance_type == DistanceType::Cosine { test_index_multivec(params.clone(), nlist, recall_requirement).await; } - test_remap(params, nlist).await; + test_remap(params.clone(), nlist).await; + test_optimize_strategy(params).await; } #[rstest] @@ -991,8 +1128,9 @@ mod tests { ); test_index(params.clone(), nlist, recall_requirement).await; if distance_type == DistanceType::Cosine { - test_index_multivec(params, nlist, recall_requirement).await; + test_index_multivec(params.clone(), nlist, recall_requirement).await; } + test_optimize_strategy(params).await; } #[rstest] @@ -1016,8 +1154,9 @@ mod tests { ); test_index(params.clone(), nlist, recall_requirement).await; if distance_type == DistanceType::Cosine { - test_index_multivec(params, nlist, recall_requirement).await; + test_index_multivec(params.clone(), nlist, recall_requirement).await; } + test_optimize_strategy(params).await; } #[rstest] @@ -1041,8 +1180,9 @@ mod tests { ); test_index(params.clone(), nlist, recall_requirement).await; if distance_type == DistanceType::Cosine { - test_index_multivec(params, nlist, recall_requirement).await; + test_index_multivec(params.clone(), nlist, recall_requirement).await; } + test_optimize_strategy(params).await; } async fn test_index_multivec(params: VectorIndexParams, nlist: usize, recall_requirement: f32) { diff --git a/rust/lance/src/index/vector/pq.rs b/rust/lance/src/index/vector/pq.rs index a96e8e4720..3826c97b99 100644 --- a/rust/lance/src/index/vector/pq.rs +++ b/rust/lance/src/index/vector/pq.rs @@ -368,6 +368,12 @@ impl VectorIndex for PQIndex { Ok(Box::pin(stream)) } + fn num_rows(&self) -> u64 { + self.row_ids + .as_ref() + .map_or(0, |row_ids| row_ids.len() as u64) + } + fn row_ids(&self) -> Box> { todo!("this method is for only IVF_HNSW_* index"); } diff --git a/rust/lance/src/session/index_extension.rs b/rust/lance/src/session/index_extension.rs index d9cfac9041..16a092c326 100644 --- a/rust/lance/src/session/index_extension.rs +++ b/rust/lance/src/session/index_extension.rs @@ -162,6 +162,10 @@ mod test { unimplemented!() } + fn num_rows(&self) -> u64 { + unimplemented!() + } + fn row_ids(&self) -> Box> { unimplemented!() } From 0a4c78c34f68d7edb4c642766194a737ea3daeb5 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 12 Mar 2025 14:09:38 +0800 Subject: [PATCH 12/22] fix Signed-off-by: BubbleCal --- rust/lance-index/src/vector/ivf.rs | 5 ++++- rust/lance-index/src/vector/residual.rs | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/lance-index/src/vector/ivf.rs b/rust/lance-index/src/vector/ivf.rs index 1138475d65..649f16acbb 100644 --- a/rust/lance-index/src/vector/ivf.rs +++ b/rust/lance-index/src/vector/ivf.rs @@ -244,7 +244,10 @@ impl IvfTransformer { #[inline] pub fn compute_partitions(&self, data: &FixedSizeListArray) -> Result { - Ok(compute_partitions_arrow_array(&self.centroids, data, self.distance_type)?.into()) + Ok( + compute_partitions_arrow_array(&self.centroids, data, self.distance_type) + .map(|(part_ids, _)| part_ids.into())?, + ) } pub fn find_partitions(&self, query: &dyn Array, nprobes: usize) -> Result { diff --git a/rust/lance-index/src/vector/residual.rs b/rust/lance-index/src/vector/residual.rs index 40a8e0d770..6b79925dcd 100644 --- a/rust/lance-index/src/vector/residual.rs +++ b/rust/lance-index/src/vector/residual.rs @@ -77,6 +77,7 @@ where dimension, distance_type.expect("provide either partitions or distance type"), ) + .0 .into() }); let part_ids = part_ids.values(); From 6a9c8f5739071f5ef6362b51ab827d55080576dd Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 12 Mar 2025 14:21:26 +0800 Subject: [PATCH 13/22] retrain only if hitting the threshold Signed-off-by: BubbleCal --- rust/lance/src/index/vector/builder.rs | 14 +++++++++++--- rust/lance/src/index/vector/ivf.rs | 6 ++++++ rust/lance/src/index/vector/ivf/v2.rs | 7 ++----- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index f6cbf92fb0..8b90792e8e 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -73,6 +73,7 @@ pub struct IvfIndexBuilder { column: String, index_dir: Path, distance_type: DistanceType, + retrain: bool, // build params, only needed for building new IVF, quantizer dataset: Option, shuffler: Option>, @@ -111,6 +112,7 @@ impl IvfIndexBuilder column, index_dir, distance_type, + retrain: false, dataset: Some(dataset), shuffler: Some(shuffler.into()), ivf_params, @@ -169,6 +171,7 @@ impl IvfIndexBuilder column, index_dir, distance_type: ivf_index.metric_type(), + retrain: false, dataset: None, shuffler: None, ivf_params: None, @@ -282,6 +285,11 @@ impl IvfIndexBuilder self } + pub fn retrain(&mut self, retrain: bool) -> &mut Self { + self.retrain = retrain; + self + } + async fn load_or_build_ivf(&self) -> Result { let dataset = self.dataset.as_ref().ok_or(Error::invalid_input( "dataset not set before loading or building IVF", @@ -291,9 +299,7 @@ impl IvfIndexBuilder let dim = utils::get_vector_dim(dataset.schema(), &self.column)?; match &self.ivf { Some(ivf) => { - if self.existing_indices.is_empty() { - Ok(ivf.clone()) - } else { + if self.retrain { // retrain the IVF model with the existing indices let mut ivf_params = IvfBuildParams::new(ivf.num_partitions()); ivf_params.retrain = true; @@ -306,6 +312,8 @@ impl IvfIndexBuilder &ivf_params, ) .await + } else { + Ok(ivf.clone()) } } None => { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 00e1606767..b8f76d5c4a 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -380,12 +380,14 @@ pub(crate) async fn optimize_vector_indices_v2( let avg_loss = total_loss / total_rows as f64; let mut num_indices_to_merge = options.num_indices_to_merge; + let mut retrain = false; if let Some(original_avg_loss) = original_avg_loss { if avg_loss >= original_avg_loss * *AVG_LOSS_RETRAIN_THRESHOLD { warn!( "average loss {} of the indices is too high (> {} * {}), retrain the index", avg_loss, original_avg_loss, *AVG_LOSS_RETRAIN_THRESHOLD ); + retrain = true; num_indices_to_merge = existing_indices.len(); } } @@ -414,6 +416,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) + .retrain(retrain) .shuffle_data(unindexed) .await? .build() @@ -432,6 +435,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) + .retrain(retrain) .shuffle_data(unindexed) .await? .build() @@ -453,6 +457,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) + .retrain(retrain) .shuffle_data(unindexed) .await? .build() @@ -474,6 +479,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) + .retrain(retrain) .shuffle_data(unindexed) .await? .build() diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index df222d28b8..6782f7c8e5 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -738,7 +738,7 @@ mod tests { Arc::new(Field::new( "item", DataType::FixedSizeList( - Arc::new(Field::new("item", data_type.clone(), true)), + Arc::new(Field::new("item", data_type, true)), DIM as i32, ), true, @@ -751,10 +751,7 @@ mod tests { } else { fields.push(Field::new( "vector", - DataType::FixedSizeList( - Arc::new(Field::new("item", data_type.clone(), true)), - DIM as i32, - ), + DataType::FixedSizeList(Arc::new(Field::new("item", data_type, true)), DIM as i32), true, )); let array = Arc::new(fsl); From de30b3596c19eb753032618441a26dc50a949ec0 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 12 Mar 2025 14:26:04 +0800 Subject: [PATCH 14/22] fix Signed-off-by: BubbleCal --- python/src/utils.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/python/src/utils.rs b/python/src/utils.rs index 1b1a78cd02..f52d2844b6 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -137,13 +137,15 @@ impl KMeans { }; let values = fixed_size_arr.values().as_primitive(); let centroids = kmeans.centroids.as_primitive(); - let cluster_ids = - UInt32Array::from(compute_partitions::< - Float32Type, - KMeansAlgoFloat, - >( - centroids, values, kmeans.dimension, kmeans.distance_type - )); + let cluster_ids = UInt32Array::from( + compute_partitions::>( + centroids, + values, + kmeans.dimension, + kmeans.distance_type, + ) + .0, + ); cluster_ids.into_data().to_pyarrow(py) } From ebd7efc90f02e878e41ef005648aa61a7124082e Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 12 Mar 2025 17:23:39 +0800 Subject: [PATCH 15/22] more tests Signed-off-by: BubbleCal --- rust/lance-index/src/vector.rs | 1 + rust/lance-index/src/vector/ivf/transform.rs | 10 ++-- rust/lance-index/src/vector/v3/shuffler.rs | 32 +++++++++-- rust/lance/src/index/vector/builder.rs | 42 ++++++++++----- rust/lance/src/index/vector/ivf/v2.rs | 57 +++++++++++--------- 5 files changed, 96 insertions(+), 46 deletions(-) diff --git a/rust/lance-index/src/vector.rs b/rust/lance-index/src/vector.rs index 50e7bec432..fe2d698e9f 100644 --- a/rust/lance-index/src/vector.rs +++ b/rust/lance-index/src/vector.rs @@ -49,6 +49,7 @@ pub const INDEX_UUID_COLUMN: &str = "__index_uuid"; pub const PART_ID_COLUMN: &str = "__ivf_part_id"; pub const PQ_CODE_COLUMN: &str = "__pq_code"; pub const SQ_CODE_COLUMN: &str = "__sq_code"; +pub const LOSS_METADATA_KEY: &str = "_loss"; lazy_static! { pub static ref VECTOR_RESULT_SCHEMA: arrow_schema::SchemaRef = diff --git a/rust/lance-index/src/vector/ivf/transform.rs b/rust/lance-index/src/vector/ivf/transform.rs index 9091640830..7f80b18826 100644 --- a/rust/lance-index/src/vector/ivf/transform.rs +++ b/rust/lance-index/src/vector/ivf/transform.rs @@ -20,13 +20,15 @@ use lance_linalg::distance::DistanceType; use lance_linalg::kmeans::compute_partitions_arrow_array; use crate::vector::transform::Transformer; +use crate::vector::LOSS_METADATA_KEY; use super::PART_ID_COLUMN; /// PartitionTransformer /// /// It computes the partition ID for each row from the input batch, -/// and adds the partition ID as a new column to the batch. +/// and adds the partition ID as a new column to the batch, +/// and adds the loss as a metadata to the batch. /// /// If the partition ID ("__ivf_part_id") column is already present in the Batch, /// this transform is a Noop. @@ -75,7 +77,7 @@ impl Transformer for PartitionTransformer { .column_by_name(&self.input_column) .ok_or_else(|| lance_core::Error::Index { message: format!( - "IvfTransformer: column {} not found in the RecordBatch", + "PartitionTransformer: column {} not found in the RecordBatch", self.input_column ), location: location!(), @@ -85,7 +87,7 @@ impl Transformer for PartitionTransformer { .as_fixed_size_list_opt() .ok_or_else(|| lance_core::Error::Index { message: format!( - "IvfTransformer: column {} is not a FixedSizeListArray: {}", + "PartitionTransformer: column {} is not a FixedSizeListArray: {}", self.input_column, arr.data_type(), ), @@ -98,7 +100,7 @@ impl Transformer for PartitionTransformer { let field = Field::new(PART_ID_COLUMN, part_ids.data_type().clone(), true); Ok(batch .try_with_column(field, Arc::new(part_ids))? - .add_metadata("loss".to_owned(), loss.to_string())?) + .add_metadata(LOSS_METADATA_KEY.to_owned(), loss.to_string())?) } } diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index c791faba9b..ffd16a6605 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -31,7 +31,7 @@ use object_store::path::Path; use snafu::location; use tokio::sync::Mutex; -use crate::vector::PART_ID_COLUMN; +use crate::vector::{LOSS_METADATA_KEY, PART_ID_COLUMN}; #[async_trait::async_trait] /// A reader that can read the shuffled partitions. @@ -46,6 +46,12 @@ pub trait ShuffleReader: Send + Sync { /// Get the size of the partition by partition_id fn partition_size(&self, partition_id: usize) -> Result; + + /// Get the total loss, + /// if the loss is not available, return None, + /// in such case, the caller should sum up the losses from each batch's metadata. + /// Must be called after all partitions are read. + fn total_loss(&self) -> Option; } #[async_trait::async_trait] @@ -105,6 +111,12 @@ impl Shuffler for IvfShuffler { spawn_cpu(move || { let batch = batch?; + let loss = batch + .metadata() + .get(LOSS_METADATA_KEY) + .map(|s| s.parse::().unwrap_or_default()) + .unwrap_or_default(); + let part_ids: &UInt32Array = batch .column_by_name(PART_ID_COLUMN) .expect("Partition ID column not found") @@ -134,7 +146,7 @@ impl Shuffler for IvfShuffler { start = end; } - Ok::>, Error>(partition_buffers) + Ok::<(Vec>, f64), Error>((partition_buffers, loss)) }) }) .buffered(get_num_compute_intensive_cpus()); @@ -146,8 +158,10 @@ impl Shuffler for IvfShuffler { .collect::>(); let mut counter = 0; + let mut total_loss = 0.0; while let Some(shuffled) = parallel_sort_stream.next().await { - let shuffled = shuffled?; + let (shuffled, loss) = shuffled?; + total_loss += loss; for (part_id, batches) in shuffled.into_iter().enumerate() { let part_batches = &mut partition_buffers[part_id]; @@ -218,6 +232,7 @@ impl Shuffler for IvfShuffler { self.object_store.clone(), self.output_dir.clone(), partition_sizes, + total_loss, ))) } } @@ -226,6 +241,7 @@ pub struct IvfShufflerReader { scheduler: Arc, output_dir: Path, partition_sizes: Vec, + loss: f64, } impl IvfShufflerReader { @@ -233,6 +249,7 @@ impl IvfShufflerReader { object_store: Arc, output_dir: Path, partition_sizes: Vec, + loss: f64, ) -> Self { let scheduler_config = SchedulerConfig::max_bandwidth(&object_store); let scheduler = ScanScheduler::new(object_store, scheduler_config); @@ -240,6 +257,7 @@ impl IvfShufflerReader { scheduler, output_dir, partition_sizes, + loss, } } } @@ -275,6 +293,10 @@ impl ShuffleReader for IvfShufflerReader { fn partition_size(&self, partition_id: usize) -> Result { Ok(self.partition_sizes[partition_id]) } + + fn total_loss(&self) -> Option { + Some(self.loss) + } } pub struct SinglePartitionReader { @@ -311,4 +333,8 @@ impl ShuffleReader for SinglePartitionReader { // so we just return 1 here Ok(1) } + + fn total_loss(&self) -> Option { + None + } } diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 8b90792e8e..c28dfb443b 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -24,7 +24,7 @@ use lance_index::vector::quantizer::{ use lance_index::vector::storage::STORAGE_METADATA_KEY; use lance_index::vector::v3::shuffler::IvfShufflerReader; use lance_index::vector::v3::subindex::SubIndexType; -use lance_index::vector::{VectorIndex, PART_ID_FIELD}; +use lance_index::vector::{VectorIndex, LOSS_METADATA_KEY, PART_ID_FIELD}; use lance_index::{ pb, vector::{ @@ -451,6 +451,7 @@ impl IvfIndexBuilder Arc::new(self.store.clone()), self.temp_dir.clone(), vec![0; ivf.num_partitions()], + 0.0, ))); return Ok(self); } @@ -474,7 +475,7 @@ impl IvfIndexBuilder "dataset not set before building partitions", location!(), ))?; - let ivf = self.ivf.as_ref().ok_or(Error::invalid_input( + let ivf = self.ivf.as_mut().ok_or(Error::invalid_input( "IVF not set before building partitions", location!(), ))?; @@ -503,10 +504,10 @@ impl IvfIndexBuilder let dataset = Arc::new(dataset.clone()); let reader = reader.clone(); - let ivf = Arc::new(ivf.clone()); + let ivf_model = Arc::new(ivf.clone()); let existing_indices = Arc::new(self.existing_indices.clone()); let distance_type = self.distance_type; - let mut partition_sizes = vec![(0, 0); ivf.num_partitions()]; + let mut partition_sizes = vec![(0, 0); ivf_model.num_partitions()]; let build_iter = partition_build_order.iter().map(|&partition| { let dataset = dataset.clone(); let reader = reader.clone(); @@ -514,11 +515,11 @@ impl IvfIndexBuilder let column = self.column.clone(); let store = self.store.clone(); let temp_dir = self.temp_dir.clone(); - let ivf = ivf.clone(); + let ivf = ivf_model.clone(); let quantizer = quantizer.clone(); let sub_index_params = sub_index_params.clone(); async move { - let batches = Self::take_partition_batches( + let (batches, loss) = Self::take_partition_batches( partition, existing_indices.as_ref(), reader.as_ref(), @@ -530,7 +531,7 @@ impl IvfIndexBuilder let num_rows = batches.iter().map(|b| b.num_rows()).sum::(); if num_rows == 0 { - return Ok((0, 0)); + return Ok(((0, 0), 0.0)); } let batch = arrow::compute::concat_batches(&batches[0].schema(), batches.iter())?; @@ -545,6 +546,7 @@ impl IvfIndexBuilder partition, ) .await + .map(|res| (res, loss)) } }); let results = stream::iter(build_iter) @@ -553,9 +555,15 @@ impl IvfIndexBuilder .boxed() .await?; - for (i, result) in results.into_iter().enumerate() { - partition_sizes[partition_build_order[i]] = result; + let mut total_loss = 0.0; + for (i, (res, loss)) in results.into_iter().enumerate() { + total_loss += loss; + partition_sizes[partition_build_order[i]] = res; + } + if let Some(loss) = reader.total_loss() { + total_loss += loss; } + ivf.loss = Some(total_loss); self.partition_sizes = partition_sizes; Ok(self) @@ -617,7 +625,7 @@ impl IvfIndexBuilder dataset: &Arc, column: &str, store: &ObjectStore, - ) -> Result> { + ) -> Result<(Vec, f64)> { let mut batches = Vec::new(); for existing_index in existing_indices.iter() { let existing_index = existing_index @@ -648,15 +656,23 @@ impl IvfIndexBuilder batches.extend(part_batches); } + let mut loss = 0.0; if reader.partition_size(part_id)? > 0 { - let partition_data = reader.read_partition(part_id).await?.ok_or(Error::io( + let mut partition_data = reader.read_partition(part_id).await?.ok_or(Error::io( format!("partition {} is empty", part_id).as_str(), location!(), ))?; - batches.extend(partition_data.try_collect::>().await?); + while let Some(batch) = partition_data.try_next().await? { + loss += batch + .metadata() + .get(LOSS_METADATA_KEY) + .map(|s| s.parse::().unwrap_or(0.0)) + .unwrap_or(0.0); + batches.push(batch); + } } - Ok(batches) + Ok((batches, loss)) } async fn merge_partitions(&mut self) -> Result<()> { diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 6782f7c8e5..1e5726933a 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -614,8 +614,8 @@ mod tests { use arrow::datatypes::{UInt64Type, UInt8Type}; use arrow::{array::AsArray, datatypes::Float32Type}; use arrow_array::{ - Array, ArrayRef, ArrowPrimitiveType, FixedSizeListArray, ListArray, RecordBatch, - RecordBatchIterator, UInt64Array, + Array, ArrayRef, ArrowNativeTypeOp, ArrowPrimitiveType, FixedSizeListArray, ListArray, + RecordBatch, RecordBatchIterator, UInt64Array, }; use arrow_buffer::OffsetBuffer; use arrow_schema::{DataType, Field, Schema, SchemaRef}; @@ -704,7 +704,7 @@ mod tests { where T::Native: SampleUniform, { - const VECTOR_NUM_PER_ROW: usize = 5; + const VECTOR_NUM_PER_ROW: usize = 3; let start_id = start_id.unwrap_or(0); let ids = Arc::new(UInt64Array::from_iter_values( start_id..start_id + num_rows as u64, @@ -717,32 +717,20 @@ mod tests { let data_type = vectors.data_type().clone(); let mut fields = vec![Field::new("id", DataType::UInt64, false)]; let mut arrays: Vec = vec![ids]; - let mut fsl = FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap(); - if data_type != DataType::UInt8 { - fsl = lance_linalg::kernels::normalize_fsl(&fsl).unwrap(); - } + let fsl = FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap(); if is_multivector { + let vector_field = Arc::new(Field::new( + "item", + DataType::FixedSizeList(Arc::new(Field::new("item", data_type, true)), DIM as i32), + true, + )); fields.push(Field::new( "vector", - DataType::List(Arc::new(Field::new( - "item", - DataType::FixedSizeList( - Arc::new(Field::new("item", data_type.clone(), true)), - DIM as i32, - ), - true, - ))), + DataType::List(vector_field.clone()), true, )); let array = Arc::new(ListArray::new( - Arc::new(Field::new( - "item", - DataType::FixedSizeList( - Arc::new(Field::new("item", data_type, true)), - DIM as i32, - ), - true, - )), + vector_field, OffsetBuffer::from_lengths(std::iter::repeat(VECTOR_NUM_PER_ROW).take(num_rows)), Arc::new(fsl), None, @@ -978,7 +966,7 @@ mod tests { params: VectorIndexParams, range: Range, ) where - T::Native: SampleUniform + std::ops::Add, + T::Native: SampleUniform, { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -1019,7 +1007,10 @@ mod tests { let mut count = 0; // append more rows and make delta index until hitting the retrain threshold loop { - let range = range.start..range.end + range.end + range.end + range.end + range.end; + let range = match count { + 0 => range.clone(), + _ => range.end.neg_wrapping().sub_wrapping(range.end)..range.end.neg_wrapping(), + }; append_dataset::(&mut dataset, 500, range).await; dataset .optimize_indices(&OptimizeOptions { @@ -1032,8 +1023,22 @@ mod tests { let new_avg_loss = get_avg_loss(&dataset).await; if new_avg_loss / original_avg_loss >= *AVG_LOSS_RETRAIN_THRESHOLD { + if count <= 1 { + // the first append is with the same data distribution, so the loss should be + // very close to the original loss, then it shouldn't hit the retrain threshold + panic!( + "retrain threshold {} should not be hit", + *AVG_LOSS_RETRAIN_THRESHOLD + ); + } break; } + if count >= 10 { + panic!( + "failed to hit the retrain threshold {}", + *AVG_LOSS_RETRAIN_THRESHOLD + ); + } // all delta indices should have the same centroids as the original index let ivf_models = get_ivf_models(&dataset).await; @@ -1052,7 +1057,7 @@ mod tests { .await .unwrap(); let stats = dataset.index_statistics("vector_idx").await.unwrap(); - let stats = serde_json::to_value(stats).unwrap(); + let stats: serde_json::Value = serde_json::from_str(&stats).unwrap(); assert_eq!(stats["num_indices"], 1); let ivf_models = get_ivf_models(&dataset).await; From b10f1e5631b323e71edc1392c8ec2b270b5f7b60 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 12 Mar 2025 17:59:13 +0800 Subject: [PATCH 16/22] normalize Signed-off-by: BubbleCal --- rust/lance/src/index/vector/ivf/v2.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 1e5726933a..78b47d3aa3 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -633,6 +633,7 @@ mod tests { use lance_index::{DatasetIndexExt, IndexType}; use lance_linalg::distance::hamming::hamming; use lance_linalg::distance::{multivec_distance, DistanceType}; + use lance_linalg::kernels::normalize_fsl; use lance_testing::datagen::generate_random_array_with_range; use rand::distributions::uniform::SampleUniform; use rstest::rstest; @@ -717,7 +718,10 @@ mod tests { let data_type = vectors.data_type().clone(); let mut fields = vec![Field::new("id", DataType::UInt64, false)]; let mut arrays: Vec = vec![ids]; - let fsl = FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap(); + let mut fsl = FixedSizeListArray::try_new_from_values(vectors, DIM as i32).unwrap(); + if fsl.value_type() != DataType::UInt8 { + fsl = normalize_fsl(&fsl).unwrap(); + } if is_multivector { let vector_field = Arc::new(Field::new( "item", @@ -1438,7 +1442,7 @@ mod tests { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let nlist = 1000; + let nlist = 500; let (mut dataset, _) = generate_test_dataset::(test_uri, 0.0..1.0).await; let ivf_params = IvfBuildParams::new(nlist); From 52b27c7c941b4b8c083e707a09840663ce72ca8d Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Wed, 12 Mar 2025 19:08:21 +0800 Subject: [PATCH 17/22] fix hamming Signed-off-by: BubbleCal --- rust/lance/src/index/vector/ivf.rs | 55 +++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index b8f76d5c4a..6084ce7003 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -36,7 +36,7 @@ use lance_file::{ format::MAGIC, writer::{FileWriter, FileWriterOptions}, }; -use lance_index::vector::flat::index::{FlatIndex, FlatQuantizer}; +use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer}; use lance_index::vector::ivf::storage::IvfModel; use lance_index::vector::pq::storage::transpose; use lance_index::vector::quantizer::QuantizationType; @@ -402,25 +402,46 @@ pub(crate) async fn optimize_vector_indices_v2( }; let indices_to_merge = existing_indices[start_pos..].to_vec(); let merged_num = indices_to_merge.len(); + + let (_, element_type) = get_vector_type(dataset.schema(), vector_column)?; match index_type { // IVF_FLAT (SubIndexType::Flat, QuantizationType::Flat) => { - IvfIndexBuilder::::new_incremental( - dataset.clone(), - vector_column.to_owned(), - index_dir, - distance_type, - shuffler, - (), - )? - .with_ivf(ivf_model.clone()) - .with_quantizer(quantizer.try_into()?) - .with_existing_indices(indices_to_merge) - .retrain(retrain) - .shuffle_data(unindexed) - .await? - .build() - .await?; + if element_type == DataType::UInt8 { + IvfIndexBuilder::::new_incremental( + dataset.clone(), + vector_column.to_owned(), + index_dir, + distance_type, + shuffler, + (), + )? + .with_ivf(ivf_model.clone()) + .with_quantizer(quantizer.try_into()?) + .with_existing_indices(indices_to_merge) + .retrain(retrain) + .shuffle_data(unindexed) + .await? + .build() + .await?; + } else { + IvfIndexBuilder::::new_incremental( + dataset.clone(), + vector_column.to_owned(), + index_dir, + distance_type, + shuffler, + (), + )? + .with_ivf(ivf_model.clone()) + .with_quantizer(quantizer.try_into()?) + .with_existing_indices(indices_to_merge) + .retrain(retrain) + .shuffle_data(unindexed) + .await? + .build() + .await?; + } } // IVF_PQ (SubIndexType::Flat, QuantizationType::Product) => { From c0ccaa66a570f31f9e23aacd8403ffa0c151013b Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Thu, 13 Mar 2025 12:48:46 +0800 Subject: [PATCH 18/22] fix Signed-off-by: BubbleCal --- rust/lance/src/index/vector/builder.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index a3c55f0be8..f22f32eedc 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -472,10 +472,6 @@ impl IvfIndexBuilder #[instrument(name = "build_partitions", level = "debug", skip_all)] async fn build_partitions(&mut self) -> Result<&mut Self> { - let dataset = self.dataset.as_ref().ok_or(Error::invalid_input( - "dataset not set before building partitions", - location!(), - ))?; let ivf = self.ivf.as_mut().ok_or(Error::invalid_input( "IVF not set before building partitions", location!(), From 7c841d33d2e0a023f6ecfa9a911af0fe7348fba3 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 14 Mar 2025 14:49:56 +0800 Subject: [PATCH 19/22] fix Signed-off-by: BubbleCal --- rust/lance-index/src/optimize.rs | 50 +++++++++++++++++++++++++ rust/lance/src/index.rs | 22 +++-------- rust/lance/src/index/vector/ivf.rs | 53 +++++++-------------------- rust/lance/src/index/vector/ivf/v2.rs | 46 +++++++++++++---------- 4 files changed, 95 insertions(+), 76 deletions(-) diff --git a/rust/lance-index/src/optimize.rs b/rust/lance-index/src/optimize.rs index 5f9b6a78ed..558640f2d5 100644 --- a/rust/lance-index/src/optimize.rs +++ b/rust/lance-index/src/optimize.rs @@ -20,6 +20,19 @@ pub struct OptimizeOptions { /// the index names to optimize. If None, all indices will be optimized. pub index_names: Option>, + + /// whether to retrain the whole index. Default: false. + /// + /// If true, the index will be retrained based on the current data, + /// `num_indices_to_merge` will be ignored, and all indices will be merged into one. + /// If false, the index will be optimized by merging `num_indices_to_merge` indices. + /// + /// This is useful when the data distribution has changed significantly, + /// and we want to retrain the index to improve the search quality. + /// This would be faster than re-create the index from scratch. + /// + /// NOTE: this option is only supported for v3 vector indices. + pub retrain: bool, } impl Default for OptimizeOptions { @@ -27,6 +40,43 @@ impl Default for OptimizeOptions { Self { num_indices_to_merge: 1, index_names: None, + retrain: false, } } } + +impl OptimizeOptions { + pub fn new() -> Self { + Self { + num_indices_to_merge: 1, + index_names: None, + retrain: false, + } + } + + pub fn append() -> Self { + Self { + num_indices_to_merge: 0, + index_names: None, + retrain: false, + } + } + + pub fn retrain() -> Self { + Self { + num_indices_to_merge: 0, + index_names: None, + retrain: true, + } + } + + pub fn num_indices_to_merge(mut self, num: usize) -> Self { + self.num_indices_to_merge = num; + self + } + + pub fn index_names(mut self, names: Vec) -> Self { + self.index_names = Some(names); + self + } +} diff --git a/rust/lance/src/index.rs b/rust/lance/src/index.rs index 9e9df5ef4c..fa8c04d477 100644 --- a/rust/lance/src/index.rs +++ b/rust/lance/src/index.rs @@ -1359,10 +1359,7 @@ mod tests { assert_eq!(get_bitmap(&meta[0]), vec![0]); dataset - .optimize_indices(&OptimizeOptions { - num_indices_to_merge: 0, // Just create index for delta - index_names: Some(vec![]), // Optimize nothing - }) + .optimize_indices(&OptimizeOptions::append().index_names(vec![])) // Does nothing because no index name is passed .await .unwrap(); let stats = get_stats(&dataset, "vec_idx").await; @@ -1377,10 +1374,9 @@ mod tests { // optimize the other index dataset - .optimize_indices(&OptimizeOptions { - num_indices_to_merge: 0, // Just create index for delta - index_names: Some(vec!["other_vec_idx".to_string()]), - }) + .optimize_indices( + &OptimizeOptions::append().index_names(vec!["other_vec_idx".to_owned()]), + ) .await .unwrap(); let stats = get_stats(&dataset, "vec_idx").await; @@ -1586,10 +1582,7 @@ mod tests { assert_indexed_rows(&dataset, num_rows).await; dataset - .optimize_indices(&OptimizeOptions { - num_indices_to_merge: 0, - index_names: None, - }) + .optimize_indices(&OptimizeOptions::append()) .await .unwrap(); let num_rows = dataset.count_all_rows().await.unwrap(); @@ -1680,10 +1673,7 @@ mod tests { } dataset - .optimize_indices(&OptimizeOptions { - num_indices_to_merge: 0, - index_names: None, - }) + .optimize_indices(&OptimizeOptions::append()) .await .unwrap(); let num_rows = dataset.count_all_rows().await.unwrap(); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 2cae91c976..c5dc0b3f97 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -70,8 +70,7 @@ use lance_linalg::{ distance::Normalize, kernels::{normalize_arrow, normalize_fsl}, }; -use lazy_static::lazy_static; -use log::{info, warn}; +use log::info; use object_store::path::Path; use rand::{rngs::SmallRng, SeedableRng}; use roaring::RoaringBitmap; @@ -94,13 +93,6 @@ use crate::{ session::Session, }; -lazy_static! { - static ref AVG_LOSS_RETRAIN_THRESHOLD: f64 = std::env::var("LANCE_AVG_LOSS_RETRAIN_THRESHOLD") - .unwrap_or("1.1".to_string()) - .parse() - .expect("LANCE_AVG_LOSS_RETRAIN_THRESHOLD must be a float number"); -} - pub mod builder; pub mod io; pub mod v2; @@ -366,32 +358,11 @@ pub(crate) async fn optimize_vector_indices_v2( let num_partitions = ivf_model.num_partitions(); let index_type = existing_indices[0].sub_index_type(); - let original_avg_loss = ivf_model - .loss() - .map(|loss| loss / existing_indices[0].num_rows() as f64); - let total_loss = existing_indices - .iter() - .map(|idx| idx.ivf_model().loss().unwrap_or_default()) - .sum::(); - let total_rows = existing_indices - .iter() - .map(|idx| idx.num_rows()) - .sum::(); - let avg_loss = total_loss / total_rows as f64; - - let mut num_indices_to_merge = options.num_indices_to_merge; - let mut retrain = false; - if let Some(original_avg_loss) = original_avg_loss { - if avg_loss >= original_avg_loss * *AVG_LOSS_RETRAIN_THRESHOLD { - warn!( - "average loss {} of the indices is too high (> {} * {}), retrain the index", - avg_loss, original_avg_loss, *AVG_LOSS_RETRAIN_THRESHOLD - ); - retrain = true; - num_indices_to_merge = existing_indices.len(); - } - } - + let num_indices_to_merge = if options.retrain { + existing_indices.len() + } else { + options.num_indices_to_merge + }; let temp_dir = tempfile::tempdir()?; let temp_dir_path = Path::from_filesystem_path(temp_dir.path())?; let shuffler = Box::new(IvfShuffler::new(temp_dir_path, num_partitions)); @@ -419,7 +390,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) - .retrain(retrain) + .retrain(options.retrain) .shuffle_data(unindexed) .await? .build() @@ -436,7 +407,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) - .retrain(retrain) + .retrain(options.retrain) .shuffle_data(unindexed) .await? .build() @@ -456,7 +427,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) - .retrain(retrain) + .retrain(options.retrain) .shuffle_data(unindexed) .await? .build() @@ -478,7 +449,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) - .retrain(retrain) + .retrain(options.retrain) .shuffle_data(unindexed) .await? .build() @@ -500,7 +471,7 @@ pub(crate) async fn optimize_vector_indices_v2( .with_ivf(ivf_model.clone()) .with_quantizer(quantizer.try_into()?) .with_existing_indices(indices_to_merge) - .retrain(retrain) + .retrain(options.retrain) .shuffle_data(unindexed) .await? .build() @@ -764,6 +735,7 @@ pub struct IvfIndexStatistics { sub_index: serde_json::Value, partitions: Vec, centroids: Vec>, + loss: Option, } fn centroids_to_vectors(centroids: &FixedSizeListArray) -> Result>> { @@ -863,6 +835,7 @@ impl Index for IVFIndex { sub_index: self.sub_index.statistics()?, partitions: partitions_statistics, centroids: centroid_vecs, + loss: self.ivf.loss(), })?) } diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 78b47d3aa3..b56f7f460d 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -398,6 +398,7 @@ impl Index for IVFIndex Vec> { + let stats = dataset.index_statistics("vector_idx").await.unwrap(); + let stats: serde_json::Value = serde_json::from_str(&stats).unwrap(); + stats["indices"] + .as_array() + .unwrap() + .into_iter() + .map(|s| s.get("loss").map(|l| l.as_f64())) + .flatten() + .collect() + } + async fn get_avg_loss(dataset: &Dataset) -> f64 { - let ivf_models = get_ivf_models(dataset).await; - let mut loss = 0.0; - for ivf in ivf_models { - loss += ivf.loss.unwrap_or_default(); - } + let losses = get_losses(dataset).await; + let total_loss = losses.iter().filter_map(|l| *l).sum::(); let num_rows = dataset.count_rows(None).await.unwrap(); - loss / num_rows as f64 + total_loss / num_rows as f64 } - let original_ivf = get_ivf_models(&dataset).await; + const AVG_LOSS_RETRAIN_THRESHOLD: f64 = 1.1; + let original_ivfs = get_ivf_models(&dataset).await; let original_avg_loss = get_avg_loss(&dataset).await; - let original_ivf = &original_ivf[0]; + let original_ivf = &original_ivfs[0]; let mut count = 0; // append more rows and make delta index until hitting the retrain threshold loop { @@ -1017,30 +1028,28 @@ mod tests { }; append_dataset::(&mut dataset, 500, range).await; dataset - .optimize_indices(&OptimizeOptions { - num_indices_to_merge: 0, - index_names: None, - }) + .optimize_indices(&OptimizeOptions::append()) .await .unwrap(); count += 1; let new_avg_loss = get_avg_loss(&dataset).await; - if new_avg_loss / original_avg_loss >= *AVG_LOSS_RETRAIN_THRESHOLD { + if new_avg_loss / original_avg_loss >= AVG_LOSS_RETRAIN_THRESHOLD { if count <= 1 { // the first append is with the same data distribution, so the loss should be // very close to the original loss, then it shouldn't hit the retrain threshold panic!( "retrain threshold {} should not be hit", - *AVG_LOSS_RETRAIN_THRESHOLD + AVG_LOSS_RETRAIN_THRESHOLD ); } + break; } if count >= 10 { panic!( "failed to hit the retrain threshold {}", - *AVG_LOSS_RETRAIN_THRESHOLD + AVG_LOSS_RETRAIN_THRESHOLD ); } @@ -1054,10 +1063,7 @@ mod tests { // this optimize would merge all indices and retrain the IVF dataset - .optimize_indices(&OptimizeOptions { - num_indices_to_merge: 0, - index_names: None, - }) + .optimize_indices(&OptimizeOptions::retrain()) .await .unwrap(); let stats = dataset.index_statistics("vector_idx").await.unwrap(); From d22980fce837d4c6a1d090cbdb20c0903e5b06a6 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 14 Mar 2025 14:59:36 +0800 Subject: [PATCH 20/22] add python test Signed-off-by: BubbleCal --- python/python/lance/dataset.py | 9 +++++++++ python/python/tests/test_vector_index.py | 15 +++++++++++++++ python/src/dataset.rs | 3 +++ rust/lance/src/index/vector/ivf.rs | 8 +++++++- 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 57dacdb580..33cd723ff3 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3561,6 +3561,15 @@ def optimize_indices(self, **kwargs): index_names: List[str], default None The names of the indices to optimize. If None, all indices will be optimized. + retrain: bool, default False + Whether to retrain the whole index. + If true, the index will be retrained based on the current data, + `num_indices_to_merge` will be ignored, + and all indices will be merged into one. + + This is useful when the data distribution has changed significantly, + and we want to retrain the index to improve the search quality. + This would be faster than re-create the index from scratch. """ self._dataset._ds.optimize_indices(**kwargs) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index af7c2838ad..41db4a3856 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -1121,6 +1121,21 @@ def test_optimize_indices(indexed_dataset): assert len(indices) == 2 +def test_retrain_indices(indexed_dataset): + data = create_table() + indexed_dataset = lance.write_dataset(data, indexed_dataset.uri, mode="append") + indices = indexed_dataset.list_indices() + assert len(indices) == 1 + + indexed_dataset.optimize.optimize_indices(num_indices_to_merge=0) + indices = indexed_dataset.list_indices() + assert len(indices) == 2 + + indexed_dataset.optimize.optimize_indices(retrain=True) + indices = indexed_dataset.list_indices() + assert len(indices) == 1 + + def test_no_include_deleted_rows(indexed_dataset): with pytest.raises(ValueError, match="Cannot include deleted rows"): indexed_dataset.to_table( diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 39a00d7fb1..5813795419 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1152,6 +1152,9 @@ impl Dataset { .map_err(|err| PyValueError::new_err(err.to_string()))?, ); } + if let Some(retrain) = kwargs.get_item("retrain")? { + options.retrain = retrain.extract()?; + } } RT.block_on( None, diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index c5dc0b3f97..794b40b8c1 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -70,7 +70,7 @@ use lance_linalg::{ distance::Normalize, kernels::{normalize_arrow, normalize_fsl}, }; -use log::info; +use log::{info, warn}; use object_store::path::Path; use rand::{rngs::SmallRng, SeedableRng}; use roaring::RoaringBitmap; @@ -270,6 +270,12 @@ pub(crate) async fn optimize_vector_indices( .await; } + if options.retrain { + warn!( + "optimizing vector index: retrain is only supported for v3 vector indices, falling back to normal optimization. please re-create the index with lance>=0.25.0 to enable retrain." + ); + } + let new_uuid = Uuid::new_v4(); let object_store = dataset.object_store(); let index_file = dataset From 5251588f4c57088ae137c80a45025f9ee6a41158 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 14 Mar 2025 15:09:54 +0800 Subject: [PATCH 21/22] fix Signed-off-by: BubbleCal --- python/python/tests/test_vector_index.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 41db4a3856..025624439c 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -643,6 +643,7 @@ def test_pre_populated_ivf_centroids(dataset, tmp_path: Path): idx_stats = actual_statistics["indices"][0] partitions = idx_stats.pop("partitions") idx_stats.pop("centroids") + idx_stats.pop("loss") assert idx_stats == expected_statistics assert len(partitions) == 5 partition_keys = {"size"} From 52ee373f84d01000756c90c1415d365d3f8bdbd1 Mon Sep 17 00:00:00 2001 From: BubbleCal Date: Fri, 14 Mar 2025 16:07:27 +0800 Subject: [PATCH 22/22] fmt Signed-off-by: BubbleCal --- rust/lance/src/index/vector/ivf/v2.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index b56f7f460d..77d3efb5d5 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -642,7 +642,6 @@ mod tests { use crate::dataset::optimize::{compact_files, CompactionOptions}; use crate::dataset::UpdateBuilder; - use crate::index::vector::ivf::IvfIndexStatistics; use crate::index::DatasetIndexInternalExt; use crate::{index::vector::VectorIndexParams, Dataset}; @@ -1002,9 +1001,8 @@ mod tests { stats["indices"] .as_array() .unwrap() - .into_iter() - .map(|s| s.get("loss").map(|l| l.as_f64())) - .flatten() + .iter() + .flat_map(|s| s.get("loss").map(|l| l.as_f64())) .collect() }