Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Configurable block size #2439

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ proptest-state-machine = "0.1.0"
rayon = "1.8.0"
criterion = "0.3"
random-port = "0.1.1"
serial_test = "3.1.1"

[build-dependencies]
tonic-build = "0.10"
Expand Down
6 changes: 6 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ query_service:
num_worker_threads: 4
dispatcher_queue_size: 100
worker_queue_size: 100
blockfile_provider:
Arrow:
max_block_size_bytes: 16384

compaction_service:
service_name: "compaction-service"
Expand Down Expand Up @@ -79,3 +82,6 @@ compaction_service:
max_concurrent_jobs: 100
compaction_interval_sec: 60
min_compaction_size: 10
blockfile_provider:
Arrow:
max_block_size_bytes: 16384
28 changes: 15 additions & 13 deletions rust/worker/src/blockstore/arrow/block/delta.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use super::delta_storage::BlockStorage;
use crate::blockstore::{
arrow::{
blockfile::MAX_BLOCK_SIZE,
types::{ArrowWriteableKey, ArrowWriteableValue},
},
arrow::types::{ArrowWriteableKey, ArrowWriteableValue},
key::CompositeKey,
};
use arrow::{array::RecordBatch, util::bit_util};
Expand All @@ -28,6 +25,8 @@ pub struct BlockDelta {

impl BlockDelta {
/// Creates a new block delta from a block.
/// # Arguments
/// - id: the id of the block delta.
pub fn new<K: ArrowWriteableKey, V: ArrowWriteableValue>(id: Uuid) -> Self {
BlockDelta {
builder: V::get_delta_builder(),
Expand Down Expand Up @@ -122,8 +121,9 @@ impl BlockDelta {
/// split point.
pub fn split<'referred_data, K: ArrowWriteableKey, V: ArrowWriteableValue>(
&'referred_data self,
max_block_size_bytes: usize,
) -> Vec<(CompositeKey, BlockDelta)> {
let half_size = MAX_BLOCK_SIZE / 2;
let half_size = max_block_size_bytes / 2;

let mut blocks_to_split = Vec::new();
blocks_to_split.push(self.clone());
Expand Down Expand Up @@ -174,7 +174,7 @@ impl BlockDelta {
} else {
output.push((curr_block.builder.get_key(0).clone(), curr_block));
}
if new_block.get_size::<K, V>() > MAX_BLOCK_SIZE {
if new_block.get_size::<K, V>() > max_block_size_bytes {
blocks_to_split.push(new_block);
} else {
output.push((split_key.clone(), new_block));
Expand All @@ -193,7 +193,9 @@ impl BlockDelta {
mod test {
use super::*;
use crate::{
blockstore::arrow::{block::Block, provider::BlockManager},
blockstore::arrow::{
block::Block, config::TEST_MAX_BLOCK_SIZE_BYTES, provider::BlockManager,
},
segment::DataRecord,
storage::{local::LocalStorage, Storage},
types::MetadataValue,
Expand Down Expand Up @@ -223,7 +225,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage);
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let delta = block_manager.create::<&str, &Int32Array>();

let n = 2000;
Expand Down Expand Up @@ -254,7 +256,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let block_manager = BlockManager::new(storage);
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let delta = block_manager.create::<&str, &str>();
let delta_id = delta.id.clone();

Expand Down Expand Up @@ -300,7 +302,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage);
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let delta = block_manager.create::<f32, &str>();

let n = 2000;
Expand All @@ -325,7 +327,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage);
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let delta = block_manager.create::<&str, &RoaringBitmap>();

let n = 2000;
Expand Down Expand Up @@ -357,7 +359,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage);
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let ids = vec!["embedding_id_2", "embedding_id_0", "embedding_id_1"];
let embeddings = vec![
vec![1.0, 2.0, 3.0],
Expand Down Expand Up @@ -418,7 +420,7 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage);
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let delta = block_manager.create::<u32, &str>();

let n = 2000;
Expand Down
46 changes: 21 additions & 25 deletions rust/worker/src/blockstore/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use std::{collections::HashSet, mem::transmute};
use thiserror::Error;
use uuid::Uuid;

pub(super) const MAX_BLOCK_SIZE: usize = 16384;

#[derive(Clone)]
pub(crate) struct ArrowBlockfileWriter {
block_manager: BlockManager,
Expand Down Expand Up @@ -161,8 +159,8 @@ impl ArrowBlockfileWriter {
// Add the key, value pair to delta.
// Then check if its over size and split as needed
delta.add(prefix, key, value);
if delta.get_size::<K, V>() > MAX_BLOCK_SIZE {
let new_blocks = delta.split::<K, V>();
if delta.get_size::<K, V>() > self.block_manager.max_block_size_bytes() {
let new_blocks = delta.split::<K, V>(self.block_manager.max_block_size_bytes());
for (split_key, new_delta) in new_blocks {
self.sparse_index.add_block(split_key, new_delta.id);
let mut deltas = self.block_deltas.lock();
Expand Down Expand Up @@ -531,8 +529,7 @@ impl<'me, K: ArrowReadableKey<'me> + Into<KeyWrapper>, V: ArrowReadableValue<'me
#[cfg(test)]
mod tests {
use crate::{
blockstore::arrow::{blockfile::MAX_BLOCK_SIZE, provider::ArrowBlockfileProvider},
log::config::{self, GrpcLogConfig},
blockstore::arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
segment::DataRecord,
storage::{local::LocalStorage, Storage},
types::MetadataValue,
Expand All @@ -541,17 +538,14 @@ mod tests {
use proptest::prelude::*;
use proptest::test_runner::Config;
use rand::seq::IteratorRandom;
use std::{
collections::HashMap,
time::{SystemTime, UNIX_EPOCH},
};
use std::collections::HashMap;
use tokio::runtime::Runtime;

#[tokio::test]
async fn test_count() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, &Int32Array>().unwrap();
let id = writer.id();

Expand Down Expand Up @@ -583,7 +577,8 @@ mod tests {
Runtime::new().unwrap().block_on(async {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider =
ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, u32>().unwrap();
let id = writer.id();

Expand Down Expand Up @@ -640,7 +635,8 @@ mod tests {
Runtime::new().unwrap().block_on(async {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider =
ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, u32>().unwrap();
let id = writer.id();
println!("Number of keys {}", num_keys);
Expand Down Expand Up @@ -747,7 +743,7 @@ mod tests {
async fn test_blockfile() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, &Int32Array>().unwrap();
let id = writer.id();

Expand Down Expand Up @@ -779,7 +775,7 @@ mod tests {
async fn test_splitting() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, &Int32Array>().unwrap();
let id_1 = writer.id();

Expand Down Expand Up @@ -881,7 +877,7 @@ mod tests {
async fn test_splitting_boundary() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, &Int32Array>().unwrap();
let id_1 = writer.id();

Expand Down Expand Up @@ -915,7 +911,7 @@ mod tests {
async fn test_string_value() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);

let writer = blockfile_provider.create::<&str, &str>().unwrap();
let id = writer.id();
Expand Down Expand Up @@ -944,7 +940,7 @@ mod tests {
async fn test_float_key() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let provider = ArrowBlockfileProvider::new(storage);
let provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);

let writer = provider.create::<f32, &str>().unwrap();
let id = writer.id();
Expand All @@ -970,7 +966,7 @@ mod tests {
async fn test_roaring_bitmap_value() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);

let writer = blockfile_provider
.create::<&str, &roaring::RoaringBitmap>()
Expand Down Expand Up @@ -1005,7 +1001,7 @@ mod tests {
async fn test_uint_key_val() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);

let writer = blockfile_provider.create::<u32, u32>().unwrap();
let id = writer.id();
Expand All @@ -1031,7 +1027,7 @@ mod tests {
async fn test_data_record_val() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);

let writer = blockfile_provider.create::<&str, &DataRecord>().unwrap();
let id = writer.id();
Expand Down Expand Up @@ -1075,13 +1071,13 @@ mod tests {
// Tests the case where a value is larger than half the block size
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);

let writer = blockfile_provider.create::<&str, &str>().unwrap();
let id = writer.id();

let val_1_small = "a";
let val_2_large = "a".repeat(MAX_BLOCK_SIZE / 2 + 1);
let val_2_large = "a".repeat(TEST_MAX_BLOCK_SIZE_BYTES / 2 + 1);

writer.set("key", "1", val_1_small).await.unwrap();
writer.set("key", "2", val_2_large.as_str()).await.unwrap();
Expand All @@ -1099,7 +1095,7 @@ mod tests {
async fn test_delete() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, &str>().unwrap();
let id = writer.id();

Expand Down Expand Up @@ -1153,7 +1149,7 @@ mod tests {
async fn test_get_at_index() {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider = ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, &Int32Array>().unwrap();
let id_1 = writer.id();

Expand Down
5 changes: 3 additions & 2 deletions rust/worker/src/blockstore/arrow/concurrency_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(test)]
mod tests {
use crate::{
blockstore::arrow::provider::ArrowBlockfileProvider,
blockstore::arrow::{config::TEST_MAX_BLOCK_SIZE_BYTES, provider::ArrowBlockfileProvider},
storage::{local::LocalStorage, Storage},
};
use rand::Rng;
Expand All @@ -13,7 +13,8 @@ mod tests {
|| {
let tmp_dir = tempfile::tempdir().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let blockfile_provider = ArrowBlockfileProvider::new(storage);
let blockfile_provider =
ArrowBlockfileProvider::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let writer = blockfile_provider.create::<&str, u32>().unwrap();
let id = writer.id();

Expand Down
Loading
Loading