Skip to content

Commit 89a33b7

Browse files
authored
feat: cache v3 index partitions in dataset session (#3467)
for v3 vector index before this, we cache the IVF partitions in the IVF struct, which is different from v1, v1 caches all partitions in the global dataset session. this moves the partition cache to dataset session just like v1 index, so that we can manage all partitions in single cache pool, to better control the total memory usage --------- Signed-off-by: BubbleCal <bubble-cal@outlook.com>
1 parent 33ae43b commit 89a33b7

File tree

4 files changed

+131
-78
lines changed

4 files changed

+131
-78
lines changed

rust/lance-index/src/vector.rs

+8
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@
44
//! Vector Index
55
//!
66
7+
use std::any::Any;
8+
use std::fmt::Debug;
79
use std::{collections::HashMap, sync::Arc};
810

911
use arrow_array::{ArrayRef, RecordBatch, UInt32Array};
1012
use arrow_schema::Field;
1113
use async_trait::async_trait;
1214
use datafusion::execution::SendableRecordBatchStream;
15+
use deepsize::DeepSizeOf;
1316
use ivf::storage::IvfModel;
1417
use lance_core::{Result, ROW_ID_FIELD};
1518
use lance_io::object_store::ObjectStore;
@@ -228,3 +231,8 @@ pub trait VectorIndex: Send + Sync + std::fmt::Debug + Index {
228231
/// the index type of this vector index.
229232
fn sub_index_type(&self) -> (SubIndexType, QuantizationType);
230233
}
234+
235+
// it can be an IVF index or a partition of IVF index
236+
pub trait VectorIndexCacheEntry: Debug + Send + Sync + DeepSizeOf {
237+
fn as_any(&self) -> &dyn Any;
238+
}

rust/lance/src/index/cache.rs

+25-9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::sync::Arc;
55

66
use deepsize::DeepSizeOf;
7+
use lance_index::vector::VectorIndexCacheEntry;
78
use lance_index::{
89
scalar::{ScalarIndex, ScalarIndexType},
910
vector::VectorIndex,
@@ -13,8 +14,6 @@ use moka::sync::Cache;
1314

1415
use std::sync::atomic::{AtomicU64, Ordering};
1516

16-
use crate::dataset::DEFAULT_INDEX_CACHE_SIZE;
17-
1817
#[derive(Debug, Default, DeepSizeOf)]
1918
struct CacheStats {
2019
hits: AtomicU64,
@@ -36,6 +35,8 @@ pub struct IndexCache {
3635
// TODO: Can we merge these two caches into one for uniform memory management?
3736
scalar_cache: Arc<Cache<String, Arc<dyn ScalarIndex>>>,
3837
vector_cache: Arc<Cache<String, Arc<dyn VectorIndex>>>,
38+
// this is for v3 index, sadly we can't use the same cache as the vector index for now
39+
vector_partition_cache: Arc<Cache<String, Arc<dyn VectorIndexCacheEntry>>>,
3940

4041
/// Index metadata cache.
4142
///
@@ -61,6 +62,11 @@ impl DeepSizeOf for IndexCache {
6162
.iter()
6263
.map(|(_, v)| v.deep_size_of_children(context))
6364
.sum::<usize>()
65+
+ self
66+
.vector_partition_cache
67+
.iter()
68+
.map(|(_, v)| v.deep_size_of_children(context))
69+
.sum::<usize>()
6470
+ self
6571
.metadata_cache
6672
.iter()
@@ -75,19 +81,13 @@ impl IndexCache {
7581
Self {
7682
scalar_cache: Arc::new(Cache::new(capacity as u64)),
7783
vector_cache: Arc::new(Cache::new(capacity as u64)),
84+
vector_partition_cache: Arc::new(Cache::new(capacity as u64)),
7885
metadata_cache: Arc::new(Cache::new(capacity as u64)),
7986
type_cache: Arc::new(Cache::new(capacity as u64)),
8087
cache_stats: Arc::new(CacheStats::default()),
8188
}
8289
}
8390

84-
pub(crate) fn capacity(&self) -> u64 {
85-
self.vector_cache
86-
.policy()
87-
.max_capacity()
88-
.unwrap_or(DEFAULT_INDEX_CACHE_SIZE as u64)
89-
}
90-
9191
#[allow(dead_code)]
9292
pub(crate) fn len_vector(&self) -> usize {
9393
self.vector_cache.run_pending_tasks();
@@ -97,9 +97,11 @@ impl IndexCache {
9797
pub(crate) fn get_size(&self) -> usize {
9898
self.scalar_cache.run_pending_tasks();
9999
self.vector_cache.run_pending_tasks();
100+
self.vector_partition_cache.run_pending_tasks();
100101
self.metadata_cache.run_pending_tasks();
101102
(self.scalar_cache.entry_count()
102103
+ self.vector_cache.entry_count()
104+
+ self.vector_partition_cache.entry_count()
103105
+ self.metadata_cache.entry_count()) as usize
104106
}
105107

@@ -134,6 +136,16 @@ impl IndexCache {
134136
}
135137
}
136138

139+
pub(crate) fn get_vector_partition(&self, key: &str) -> Option<Arc<dyn VectorIndexCacheEntry>> {
140+
if let Some(index) = self.vector_partition_cache.get(key) {
141+
self.cache_stats.record_hit();
142+
Some(index)
143+
} else {
144+
self.cache_stats.record_miss();
145+
None
146+
}
147+
}
148+
137149
/// Insert a new entry into the cache.
138150
pub(crate) fn insert_scalar(&self, key: &str, index: Arc<dyn ScalarIndex>) {
139151
self.scalar_cache.insert(key.to_string(), index);
@@ -143,6 +155,10 @@ impl IndexCache {
143155
self.vector_cache.insert(key.to_string(), index);
144156
}
145157

158+
pub(crate) fn insert_vector_partition(&self, key: &str, index: Arc<dyn VectorIndexCacheEntry>) {
159+
self.vector_partition_cache.insert(key.to_string(), index);
160+
}
161+
146162
/// Construct a key for index metadata arrays.
147163
fn metadata_key(dataset_uuid: &str, version: u64) -> String {
148164
format!("{}:{}", dataset_uuid, version)

rust/lance/src/index/vector/builder.rs

+7
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use tempfile::{tempdir, TempDir};
5656
use tracing::{span, Level};
5757

5858
use crate::dataset::ProjectionRequest;
59+
use crate::index::vector::ivf::v2::PartitionEntry;
5960
use crate::Dataset;
6061

6162
use super::utils;
@@ -221,6 +222,12 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> IvfIndexBuilder<S, Q>
221222
let mapped = stream::iter(0..model.num_partitions())
222223
.map(|part_id| async move {
223224
let part = ivf_index.load_partition(part_id, false).await?;
225+
let part = part.as_any().downcast_ref::<PartitionEntry<S, Q>>().ok_or(
226+
Error::Internal {
227+
message: "failed to downcast partition entry".to_string(),
228+
location: location!(),
229+
},
230+
)?;
224231
Result::Ok((part.storage.remap(mapping)?, part.index.remap(mapping)?))
225232
})
226233
.buffered(get_num_compute_intensive_cpus())

rust/lance/src/index/vector/ivf/v2.rs

+91-69
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use lance_index::vector::quantizer::{QuantizationType, Quantizer};
3535
use lance_index::vector::sq::ScalarQuantizer;
3636
use lance_index::vector::storage::VectorStore;
3737
use lance_index::vector::v3::subindex::SubIndexType;
38+
use lance_index::vector::VectorIndexCacheEntry;
3839
use lance_index::{
3940
pb,
4041
vector::{
@@ -49,7 +50,6 @@ use lance_io::{
4950
object_store::ObjectStore, scheduler::ScanScheduler, traits::Reader, ReadBatchParams,
5051
};
5152
use lance_linalg::{distance::DistanceType, kernels::normalize_arrow};
52-
use moka::sync::Cache;
5353
use object_store::path::Path;
5454
use prost::Message;
5555
use roaring::RoaringBitmap;
@@ -68,12 +68,20 @@ use crate::{
6868

6969
use super::{centroids_to_vectors, IvfIndexPartitionStatistics, IvfIndexStatistics};
7070

71-
#[derive(Debug)]
71+
#[derive(Debug, DeepSizeOf)]
7272
pub struct PartitionEntry<S: IvfSubIndex, Q: Quantization> {
7373
pub index: S,
7474
pub storage: Q::Storage,
7575
}
7676

77+
impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> VectorIndexCacheEntry
78+
for PartitionEntry<S, Q>
79+
{
80+
fn as_any(&self) -> &dyn Any {
81+
self
82+
}
83+
}
84+
7785
/// IVF Index.
7886
#[derive(Debug)]
7987
pub struct IVFIndex<S: IvfSubIndex + 'static, Q: Quantization + 'static> {
@@ -86,9 +94,6 @@ pub struct IVFIndex<S: IvfSubIndex + 'static, Q: Quantization + 'static> {
8694
sub_index_metadata: Vec<String>,
8795
storage: IvfQuantizationStorage,
8896

89-
/// Index in each partition.
90-
partition_cache: Cache<String, Arc<PartitionEntry<S, Q>>>,
91-
9297
partition_locks: PartitionLoadLock,
9398

9499
distance_type: DistanceType,
@@ -98,7 +103,7 @@ pub struct IVFIndex<S: IvfSubIndex + 'static, Q: Quantization + 'static> {
98103
/// The session cache, used when fetching pages
99104
#[allow(dead_code)]
100105
session: Weak<Session>,
101-
_marker: PhantomData<Q>,
106+
_marker: PhantomData<(S, Q)>,
102107
}
103108

104109
impl<S: IvfSubIndex, Q: Quantization> DeepSizeOf for IVFIndex<S, Q> {
@@ -123,7 +128,6 @@ impl<S: IvfSubIndex + 'static, Q: Quantization> IVFIndex<S, Q> {
123128
.upgrade()
124129
.map(|sess| sess.file_metadata_cache.clone())
125130
.unwrap_or_else(FileMetadataCache::no_cache);
126-
let index_cache_capacity = session.upgrade().unwrap().index_cache.capacity();
127131
let index_reader = FileReader::try_open(
128132
scheduler
129133
.open_file(&index_dir.child(uuid.as_str()).child(INDEX_FILE_NAME))
@@ -195,7 +199,6 @@ impl<S: IvfSubIndex + 'static, Q: Quantization> IVFIndex<S, Q> {
195199
ivf,
196200
reader: index_reader,
197201
storage,
198-
partition_cache: Cache::new(index_cache_capacity),
199202
partition_locks: PartitionLoadLock::new(num_partitions),
200203
sub_index_metadata,
201204
distance_type,
@@ -209,70 +212,76 @@ impl<S: IvfSubIndex + 'static, Q: Quantization> IVFIndex<S, Q> {
209212
&self,
210213
partition_id: usize,
211214
write_cache: bool,
212-
) -> Result<Arc<PartitionEntry<S, Q>>> {
215+
) -> Result<Arc<dyn VectorIndexCacheEntry>> {
213216
let cache_key = format!("{}-ivf-{}", self.uuid, partition_id);
214-
let part_entry = if let Some(part_idx) = self.partition_cache.get(&cache_key) {
215-
part_idx
216-
} else {
217-
if partition_id >= self.ivf.num_partitions() {
218-
return Err(Error::Index {
219-
message: format!(
220-
"partition id {} is out of range of {} partitions",
221-
partition_id,
222-
self.ivf.num_partitions()
223-
),
224-
location: location!(),
225-
});
226-
}
227-
228-
let mtx = self.partition_locks.get_partition_mutex(partition_id);
229-
let _guard = mtx.lock().await;
230-
231-
// check the cache again, as the partition may have been loaded by another
232-
// thread that held the lock on loading the partition
233-
if let Some(part_idx) = self.partition_cache.get(&cache_key) {
217+
let session = self.session.upgrade().ok_or(Error::Internal {
218+
message: "attempt to use index after dataset was destroyed".into(),
219+
location: location!(),
220+
})?;
221+
let part_entry =
222+
if let Some(part_idx) = session.index_cache.get_vector_partition(&cache_key) {
234223
part_idx
235224
} else {
236-
let schema = Arc::new(self.reader.schema().as_ref().into());
237-
let batch = match self.reader.metadata().num_rows {
238-
0 => RecordBatch::new_empty(schema),
239-
_ => {
240-
let row_range = self.ivf.row_range(partition_id);
241-
if row_range.is_empty() {
242-
RecordBatch::new_empty(schema)
243-
} else {
244-
let batches = self
245-
.reader
246-
.read_stream(
247-
ReadBatchParams::Range(row_range),
248-
u32::MAX,
249-
1,
250-
FilterExpression::no_filter(),
251-
)?
252-
.try_collect::<Vec<_>>()
253-
.await?;
254-
concat_batches(&schema, batches.iter())?
225+
if partition_id >= self.ivf.num_partitions() {
226+
return Err(Error::Index {
227+
message: format!(
228+
"partition id {} is out of range of {} partitions",
229+
partition_id,
230+
self.ivf.num_partitions()
231+
),
232+
location: location!(),
233+
});
234+
}
235+
236+
let mtx = self.partition_locks.get_partition_mutex(partition_id);
237+
let _guard = mtx.lock().await;
238+
239+
// check the cache again, as the partition may have been loaded by another
240+
// thread that held the lock on loading the partition
241+
if let Some(part_idx) = session.index_cache.get_vector_partition(&cache_key) {
242+
part_idx
243+
} else {
244+
let schema = Arc::new(self.reader.schema().as_ref().into());
245+
let batch = match self.reader.metadata().num_rows {
246+
0 => RecordBatch::new_empty(schema),
247+
_ => {
248+
let row_range = self.ivf.row_range(partition_id);
249+
if row_range.is_empty() {
250+
RecordBatch::new_empty(schema)
251+
} else {
252+
let batches = self
253+
.reader
254+
.read_stream(
255+
ReadBatchParams::Range(row_range),
256+
u32::MAX,
257+
1,
258+
FilterExpression::no_filter(),
259+
)?
260+
.try_collect::<Vec<_>>()
261+
.await?;
262+
concat_batches(&schema, batches.iter())?
263+
}
255264
}
265+
};
266+
let batch = batch.add_metadata(
267+
S::metadata_key().to_owned(),
268+
self.sub_index_metadata[partition_id].clone(),
269+
)?;
270+
let idx = S::load(batch)?;
271+
let storage = self.load_partition_storage(partition_id).await?;
272+
let partition_entry = Arc::new(PartitionEntry::<S, Q> {
273+
index: idx,
274+
storage,
275+
});
276+
if write_cache {
277+
session
278+
.index_cache
279+
.insert_vector_partition(&cache_key, partition_entry.clone());
256280
}
257-
};
258-
let batch = batch.add_metadata(
259-
S::metadata_key().to_owned(),
260-
self.sub_index_metadata[partition_id].clone(),
261-
)?;
262-
let idx = S::load(batch)?;
263-
let storage = self.load_partition_storage(partition_id).await?;
264-
let partition_entry = Arc::new(PartitionEntry {
265-
index: idx,
266-
storage,
267-
});
268-
if write_cache {
269-
self.partition_cache
270-
.insert(cache_key.clone(), partition_entry.clone());
271-
}
272281

273-
partition_entry
274-
}
275-
};
282+
partition_entry
283+
}
284+
};
276285

277286
Ok(part_entry)
278287
}
@@ -428,9 +437,15 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> VectorIndex for IVFInd
428437
let param = (&query).into();
429438
let refine_factor = query.refine_factor.unwrap_or(1) as usize;
430439
let k = query.k * refine_factor;
431-
part_entry
432-
.index
433-
.search(query.key, k, param, &part_entry.storage, pre_filter)
440+
let part = part_entry
441+
.as_any()
442+
.downcast_ref::<PartitionEntry<S, Q>>()
443+
.ok_or(Error::Internal {
444+
message: "failed to downcast partition entry".to_string(),
445+
location: location!(),
446+
})?;
447+
part.index
448+
.search(query.key, k, param, &part.storage, pre_filter)
434449
})
435450
.await
436451
}
@@ -465,6 +480,13 @@ impl<S: IvfSubIndex + 'static, Q: Quantization + 'static> VectorIndex for IVFInd
465480
with_vector: bool,
466481
) -> Result<SendableRecordBatchStream> {
467482
let partition = self.load_partition(partition_id, false).await?;
483+
let partition = partition
484+
.as_any()
485+
.downcast_ref::<PartitionEntry<S, Q>>()
486+
.ok_or(Error::Internal {
487+
message: "failed to downcast partition entry".to_string(),
488+
location: location!(),
489+
})?;
468490
let store = &partition.storage;
469491
let schema = if with_vector {
470492
store.schema().clone()

0 commit comments

Comments
 (0)