From 81c2b9e4219ba1f3081a4c17cba586bf50d36d0c Mon Sep 17 00:00:00 2001 From: Mathieu Dutour Sikiric Date: Mon, 16 Sep 2024 08:00:27 +0200 Subject: [PATCH] Extend the cache support. --- CLI.md | 4 +- Cargo.lock | 1 + examples/Cargo.lock | 1 + linera-client/src/client_options.rs | 11 +- linera-indexer/lib/src/rocks_db.rs | 11 +- linera-indexer/lib/src/scylla_db.rs | 11 +- linera-service/src/proxy/main.rs | 11 +- linera-service/src/server.rs | 24 +- linera-storage-service/src/client.rs | 35 +- linera-storage-service/src/common.rs | 4 +- linera-views/Cargo.toml | 1 + linera-views/DESIGN.md | 2 +- linera-views/src/backends/dynamo_db.rs | 41 +- linera-views/src/backends/indexed_db.rs | 3 +- linera-views/src/backends/lru_caching.rs | 820 ++++++++++++++++++++--- linera-views/src/backends/memory.rs | 11 +- linera-views/src/backends/metering.rs | 18 +- linera-views/src/backends/rocks_db.rs | 137 ++-- linera-views/src/backends/scylla_db.rs | 46 +- linera-views/src/store.rs | 13 +- linera-views/src/test_utils/mod.rs | 190 +++++- linera-views/tests/store_tests.rs | 34 +- linera-views/tests/views_tests.rs | 26 +- 23 files changed, 1161 insertions(+), 294 deletions(-) diff --git a/CLI.md b/CLI.md index 2fb21d994ca6..a09f873143b2 100644 --- a/CLI.md +++ b/CLI.md @@ -110,9 +110,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp * `--max-stream-queries ` — The maximal number of simultaneous stream queries to the database Default value: `10` -* `--cache-size ` — The maximal number of entries in the storage cache - - Default value: `1000` +* `--storage-cache-policy ` — The storage cache policy * `--retry-delay-ms ` — Delay increment for retrying to connect to a validator Default value: `1000` diff --git a/Cargo.lock b/Cargo.lock index c7ad3081a161..bfdbf2095ee5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4663,6 +4663,7 @@ dependencies = [ "rocksdb", "scylla", "serde", + "serde_json", "sha3", "static_assertions", "tempfile", diff --git a/examples/Cargo.lock b/examples/Cargo.lock index cb03afc60cc9..d3d395327654 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3484,6 +3484,7 @@ dependencies = [ "prometheus", "rand", "serde", + "serde_json", "sha3", "static_assertions", "tempfile", diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index 44307362d691..d6faba694b14 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -16,7 +16,7 @@ use linera_core::client::BlanketMessagePolicy; use linera_execution::{ committee::ValidatorName, ResourceControlPolicy, WasmRuntime, WithWasmDefault as _, }; -use linera_views::store::CommonStoreConfig; +use linera_views::{store::CommonStoreConfig, lru_caching::read_storage_cache_policy}; #[cfg(feature = "fs")] use crate::config::GenesisConfig; @@ -104,9 +104,9 @@ pub struct ClientOptions { #[arg(long, default_value = "10")] pub max_stream_queries: usize, - /// The maximal number of entries in the storage cache. - #[arg(long, default_value = "1000")] - pub cache_size: usize, + /// The storage cache policy + #[arg(long)] + pub storage_cache_policy: Option, /// Subcommand. #[command(subcommand)] @@ -167,10 +167,11 @@ impl ClientOptions { } fn common_config(&self) -> CommonStoreConfig { + let storage_cache_policy = read_storage_cache_policy(self.storage_cache_policy.clone()); CommonStoreConfig { max_concurrent_queries: self.max_concurrent_queries, max_stream_queries: self.max_stream_queries, - cache_size: self.cache_size, + storage_cache_policy, } } diff --git a/linera-indexer/lib/src/rocks_db.rs b/linera-indexer/lib/src/rocks_db.rs index ffe3c02d9a8e..520bc72d5da0 100644 --- a/linera-indexer/lib/src/rocks_db.rs +++ b/linera-indexer/lib/src/rocks_db.rs @@ -5,6 +5,7 @@ use std::path::PathBuf; use clap::Parser as _; use linera_views::{ + lru_caching::read_storage_cache_policy, rocks_db::{PathWithGuard, RocksDbSpawnMode, RocksDbStore, RocksDbStoreConfig}, store::{AdminKeyValueStore, CommonStoreConfig}, }; @@ -28,9 +29,9 @@ pub struct RocksDbConfig { /// The maximal number of simultaneous stream queries to the database #[arg(long, default_value = "10")] pub max_stream_queries: usize, - /// The maximal number of entries in the storage cache. - #[arg(long, default_value = "1000")] - cache_size: usize, + /// The file of the storage cache policy + #[arg(long)] + storage_cache_policy: Option, } pub type RocksDbRunner = Runner; @@ -38,10 +39,12 @@ pub type RocksDbRunner = Runner; impl RocksDbRunner { pub async fn load() -> Result { let config = IndexerConfig::::parse(); + let storage_cache_policy = + read_storage_cache_policy(config.client.storage_cache_policy.clone()); let common_config = CommonStoreConfig { max_concurrent_queries: config.client.max_concurrent_queries, max_stream_queries: config.client.max_stream_queries, - cache_size: config.client.cache_size, + storage_cache_policy, }; let path_buf = config.client.storage.as_path().to_path_buf(); let path_with_guard = PathWithGuard::new(path_buf); diff --git a/linera-indexer/lib/src/scylla_db.rs b/linera-indexer/lib/src/scylla_db.rs index 8c6f6a0ff630..90662d7715bd 100644 --- a/linera-indexer/lib/src/scylla_db.rs +++ b/linera-indexer/lib/src/scylla_db.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use linera_views::{ + lru_caching::read_storage_cache_policy, scylla_db::{ScyllaDbStore, ScyllaDbStoreConfig}, store::{AdminKeyValueStore, CommonStoreConfig}, }; @@ -25,9 +26,9 @@ pub struct ScyllaDbConfig { /// The maximal number of simultaneous stream queries to the database #[arg(long, default_value = "10")] pub max_stream_queries: usize, - /// The maximal number of entries in the storage cache. - #[arg(long, default_value = "1000")] - cache_size: usize, + /// The storage cache policy + #[arg(long)] + storage_cache_policy: Option, } pub type ScyllaDbRunner = Runner; @@ -35,10 +36,12 @@ pub type ScyllaDbRunner = Runner; impl ScyllaDbRunner { pub async fn load() -> Result { let config = as clap::Parser>::parse(); + let storage_cache_policy = + read_storage_cache_policy(config.client.storage_cache_policy.clone()); let common_config = CommonStoreConfig { max_concurrent_queries: config.client.max_concurrent_queries, max_stream_queries: config.client.max_stream_queries, - cache_size: config.client.cache_size, + storage_cache_policy, }; let namespace = config.client.table.clone(); let root_key = &[]; diff --git a/linera-service/src/proxy/main.rs b/linera-service/src/proxy/main.rs index 3387dc5090cb..65c0fe8b41c6 100644 --- a/linera-service/src/proxy/main.rs +++ b/linera-service/src/proxy/main.rs @@ -25,7 +25,7 @@ use linera_rpc::{ use linera_service::prometheus_server; use linera_service::util; use linera_storage::Storage; -use linera_views::store::CommonStoreConfig; +use linera_views::{store::CommonStoreConfig, lru_caching::read_storage_cache_policy}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument}; @@ -68,9 +68,9 @@ pub struct ProxyOptions { #[arg(long, default_value = "10")] max_stream_queries: usize, - /// The maximal number of entries in the storage cache. - #[arg(long, default_value = "1000")] - cache_size: usize, + /// The storage cache policy + #[arg(long)] + storage_cache_policy: Option, /// Path to the file describing the initial user chains (aka genesis state) #[arg(long = "genesis")] @@ -366,10 +366,11 @@ fn main() -> Result<()> { impl ProxyOptions { async fn run(&self) -> Result<()> { + let storage_cache_policy = read_storage_cache_policy(self.storage_cache_policy.clone()); let common_config = CommonStoreConfig { max_concurrent_queries: self.max_concurrent_queries, max_stream_queries: self.max_stream_queries, - cache_size: self.cache_size, + storage_cache_policy, }; let full_storage_config = self.storage_config.add_common_config(common_config).await?; let genesis_config: GenesisConfig = util::read_json(&self.genesis_config_path)?; diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index fca0c0bffc55..4d630dbc0b2f 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -33,7 +33,7 @@ use linera_rpc::{ use linera_service::prometheus_server; use linera_service::util; use linera_storage::Storage; -use linera_views::store::CommonStoreConfig; +use linera_views::{store::CommonStoreConfig, lru_caching::read_storage_cache_policy}; use serde::Deserialize; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -350,9 +350,9 @@ enum ServerCommand { #[arg(long, default_value = "10")] max_stream_queries: usize, - /// The maximal number of entries in the storage cache. - #[arg(long, default_value = "1000")] - cache_size: usize, + /// The storage cache policy + #[arg(long)] + storage_cache_policy: Option, }, /// Act as a trusted third-party and generate all server configurations @@ -391,9 +391,9 @@ enum ServerCommand { #[arg(long, default_value = "10")] max_stream_queries: usize, - /// The maximal number of entries in the storage cache. - #[arg(long, default_value = "1000")] - cache_size: usize, + /// The storage cache policy + #[arg(long)] + storage_cache_policy: Option, }, } @@ -459,7 +459,7 @@ async fn run(options: ServerOptions) { wasm_runtime, max_concurrent_queries, max_stream_queries, - cache_size, + storage_cache_policy, } => { let genesis_config: GenesisConfig = util::read_json(&genesis_config_path).expect("Fail to read initial chain config"); @@ -481,10 +481,11 @@ async fn run(options: ServerOptions) { grace_period, }; let wasm_runtime = wasm_runtime.with_wasm_default(); + let storage_cache_policy = read_storage_cache_policy(storage_cache_policy); let common_config = CommonStoreConfig { max_concurrent_queries, max_stream_queries, - cache_size, + storage_cache_policy, }; let full_storage_config = storage_config .add_common_config(common_config) @@ -541,14 +542,15 @@ async fn run(options: ServerOptions) { genesis_config_path, max_concurrent_queries, max_stream_queries, - cache_size, + storage_cache_policy, } => { let genesis_config: GenesisConfig = util::read_json(&genesis_config_path).expect("Fail to read initial chain config"); + let storage_cache_policy = read_storage_cache_policy(storage_cache_policy); let common_config = CommonStoreConfig { max_concurrent_queries, max_stream_queries, - cache_size, + storage_cache_policy, }; let full_storage_config = storage_config .add_common_config(common_config) diff --git a/linera-storage-service/src/client.rs b/linera-storage-service/src/client.rs index d484922feb8e..f19cdda373f5 100644 --- a/linera-storage-service/src/client.rs +++ b/linera-storage-service/src/client.rs @@ -9,7 +9,7 @@ use linera_base::ensure; use linera_views::metering::MeteredStore; use linera_views::{ batch::{Batch, WriteOperation}, - lru_caching::LruCachingStore, + lru_caching::{CachingStore, StorageCachePolicy, DEFAULT_STORAGE_CACHE_POLICY}, store::{ AdminKeyValueStore, CommonStoreConfig, ReadableKeyValueStore, WithError, WritableKeyValueStore, @@ -62,7 +62,7 @@ pub struct ServiceStoreClientInternal { channel: Channel, semaphore: Option>, max_stream_queries: usize, - cache_size: usize, + storage_cache_policy: StorageCachePolicy, namespace: Vec, root_key: Vec, } @@ -400,14 +400,14 @@ impl AdminKeyValueStore for ServiceStoreClientInternal { .max_concurrent_queries .map(|n| Arc::new(Semaphore::new(n))); let max_stream_queries = config.common_config.max_stream_queries; - let cache_size = config.common_config.cache_size; + let storage_cache_policy = config.common_config.storage_cache_policy.clone(); let namespace = Self::namespace_as_vec(namespace)?; let root_key = root_key.to_vec(); Ok(Self { channel, semaphore, max_stream_queries, - cache_size, + storage_cache_policy, namespace, root_key, }) @@ -417,14 +417,14 @@ impl AdminKeyValueStore for ServiceStoreClientInternal { let channel = self.channel.clone(); let semaphore = self.semaphore.clone(); let max_stream_queries = self.max_stream_queries; - let cache_size = self.cache_size; + let storage_cache_policy = self.storage_cache_policy.clone(); let namespace = self.namespace.clone(); let root_key = root_key.to_vec(); Ok(Self { channel, semaphore, max_stream_queries, - cache_size, + storage_cache_policy, namespace, root_key, }) @@ -503,11 +503,11 @@ impl TestKeyValueStore for ServiceStoreClientInternal { /// Creates the `CommonStoreConfig` for the `ServiceStoreClientInternal`. pub fn create_service_store_common_config() -> CommonStoreConfig { let max_stream_queries = 100; - let cache_size = 10; // unused + let storage_cache_policy = DEFAULT_STORAGE_CACHE_POLICY; CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size, + storage_cache_policy, } } @@ -552,9 +552,9 @@ pub async fn create_service_test_store() -> Result>>, + store: MeteredStore>>, #[cfg(not(with_metrics))] - store: LruCachingStore, + store: CachingStore, } impl WithError for ServiceStoreClient { @@ -624,15 +624,15 @@ impl AdminKeyValueStore for ServiceStoreClient { namespace: &str, root_key: &[u8], ) -> Result { - let cache_size = config.common_config.cache_size; + let storage_cache_policy = config.common_config.storage_cache_policy.clone(); let store = ServiceStoreClientInternal::connect(config, namespace, root_key).await?; - Ok(ServiceStoreClient::from_inner(store, cache_size)) + Ok(ServiceStoreClient::from_inner(store, storage_cache_policy)) } fn clone_with_root_key(&self, root_key: &[u8]) -> Result { let store = self.inner().clone_with_root_key(root_key)?; - let cache_size = self.inner().cache_size; - Ok(ServiceStoreClient::from_inner(store, cache_size)) + let storage_cache_policy = self.inner().storage_cache_policy.clone(); + Ok(ServiceStoreClient::from_inner(store, storage_cache_policy)) } async fn list_all(config: &Self::Config) -> Result, ServiceStoreError> { @@ -674,10 +674,13 @@ impl ServiceStoreClient { &self.store.store } - fn from_inner(store: ServiceStoreClientInternal, cache_size: usize) -> ServiceStoreClient { + fn from_inner( + store: ServiceStoreClientInternal, + storage_cache_policy: StorageCachePolicy, + ) -> ServiceStoreClient { #[cfg(with_metrics)] let store = MeteredStore::new(&STORAGE_SERVICE_METRICS, store); - let store = LruCachingStore::new(store, cache_size); + let store = CachingStore::new(store, storage_cache_policy); #[cfg(with_metrics)] let store = MeteredStore::new(&LRU_STORAGE_SERVICE_METRICS, store); Self { store } diff --git a/linera-storage-service/src/common.rs b/linera-storage-service/src/common.rs index eaca57cb8314..388341aaa2ec 100644 --- a/linera-storage-service/src/common.rs +++ b/linera-storage-service/src/common.rs @@ -6,6 +6,8 @@ use std::path::PathBuf; use std::sync::LazyLock; use linera_base::command::resolve_binary; +#[cfg(with_testing)] +use linera_views::lru_caching::DEFAULT_STORAGE_CACHE_POLICY; #[cfg(with_metrics)] use linera_views::metering::KeyValueStoreMetrics; use linera_views::{ @@ -83,7 +85,7 @@ pub fn create_shared_store_common_config() -> CommonStoreConfig { CommonStoreConfig { max_concurrent_queries: Some(TEST_SHARED_STORE_MAX_CONCURRENT_QUERIES), max_stream_queries: TEST_SHARED_STORE_MAX_STREAM_QUERIES, - cache_size: usize::MAX, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, } } diff --git a/linera-views/Cargo.toml b/linera-views/Cargo.toml index b8f42d0ecb5b..e09ce814898a 100644 --- a/linera-views/Cargo.toml +++ b/linera-views/Cargo.toml @@ -50,6 +50,7 @@ rand = { workspace = true, features = ["small_rng"] } rocksdb = { workspace = true, optional = true } scylla = { workspace = true, optional = true } serde.workspace = true +serde_json.workspace = true sha3.workspace = true static_assertions.workspace = true tempfile.workspace = true diff --git a/linera-views/DESIGN.md b/linera-views/DESIGN.md index 5ae2d7326726..9dac81ef0a38 100644 --- a/linera-views/DESIGN.md +++ b/linera-views/DESIGN.md @@ -17,7 +17,7 @@ We provide an implementation of the trait `KeyValueStore` for the following key- The trait `KeyValueStore` was designed so that more storage solutions can be easily added in the future. The `KeyValueStore` trait is also implemented for several internal constructions of clients: -* The `LruCachingStore` client implements the Least Recently Used (LRU) +* The `CachingStore` client implements the Least Recently Used (LRU) caching of reads into the client. * The `ViewContainer` client implements a key-value store client from a context. * The `ValueSplittingStore` implements a client for which the diff --git a/linera-views/src/backends/dynamo_db.rs b/linera-views/src/backends/dynamo_db.rs index 70f3588c01db..95be193ef978 100644 --- a/linera-views/src/backends/dynamo_db.rs +++ b/linera-views/src/backends/dynamo_db.rs @@ -37,12 +37,12 @@ use { #[cfg(with_metrics)] use crate::metering::{ - MeteredStore, DYNAMO_DB_METRICS, LRU_CACHING_METRICS, VALUE_SPLITTING_METRICS, + MeteredStore, CACHED_DYNAMO_DB_METRICS, DYNAMO_DB_METRICS, VALUE_SPLITTING_METRICS, }; use crate::{ batch::{Batch, SimpleUnorderedBatch}, journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore}, - lru_caching::LruCachingStore, + lru_caching::{CachingStore, StorageCachePolicy}, store::{ AdminKeyValueStore, CommonStoreConfig, KeyIterable, KeyValueIterable, KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore, @@ -50,7 +50,7 @@ use crate::{ value_splitting::{ValueSplittingError, ValueSplittingStore}, }; #[cfg(with_testing)] -use crate::{lru_caching::TEST_CACHE_SIZE, store::TestKeyValueStore}; +use crate::{lru_caching::DEFAULT_STORAGE_CACHE_POLICY, store::TestKeyValueStore}; /// Name of the environment variable with the address to a LocalStack instance. const LOCALSTACK_ENDPOINT: &str = "LOCALSTACK_ENDPOINT"; @@ -340,7 +340,7 @@ pub struct DynamoDbStoreInternal { namespace: String, semaphore: Option>, max_stream_queries: usize, - cache_size: usize, + storage_cache_policy: StorageCachePolicy, root_key: Vec, } @@ -368,7 +368,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal { .max_concurrent_queries .map(|n| Arc::new(Semaphore::new(n))); let max_stream_queries = config.common_config.max_stream_queries; - let cache_size = config.common_config.cache_size; + let storage_cache_policy = config.common_config.storage_cache_policy.clone(); let namespace = namespace.to_string(); let root_key = root_key.to_vec(); Ok(Self { @@ -376,7 +376,7 @@ impl AdminKeyValueStore for DynamoDbStoreInternal { namespace, semaphore, max_stream_queries, - cache_size, + storage_cache_policy, root_key, }) } @@ -386,14 +386,14 @@ impl AdminKeyValueStore for DynamoDbStoreInternal { let namespace = self.namespace.clone(); let semaphore = self.semaphore.clone(); let max_stream_queries = self.max_stream_queries; - let cache_size = self.cache_size; + let storage_cache_policy = self.storage_cache_policy.clone(); let root_key = root_key.to_vec(); Ok(Self { client, namespace, semaphore, max_stream_queries, - cache_size, + storage_cache_policy, root_key, }) } @@ -1164,20 +1164,20 @@ impl KeyValueStoreError for DynamoDbStoreInternalError { const BACKEND: &'static str = "dynamo_db"; } -/// A shared DB client for DynamoDb implementing LruCaching +/// A shared DB client for DynamoDb implementing caching #[derive(Clone)] #[expect(clippy::type_complexity)] pub struct DynamoDbStore { #[cfg(with_metrics)] store: MeteredStore< - LruCachingStore< + CachingStore< MeteredStore< ValueSplittingStore>>, >, >, >, #[cfg(not(with_metrics))] - store: LruCachingStore>>, + store: CachingStore>>, } /// The combined error type for the `DynamoDbStore`. @@ -1255,15 +1255,15 @@ impl AdminKeyValueStore for DynamoDbStore { namespace: &str, root_key: &[u8], ) -> Result { - let cache_size = config.common_config.cache_size; + let storage_cache_policy = &config.common_config.storage_cache_policy; let simple_store = DynamoDbStoreInternal::connect(config, namespace, root_key).await?; - Ok(Self::from_inner(simple_store, cache_size)) + Ok(Self::from_inner(simple_store, storage_cache_policy)) } fn clone_with_root_key(&self, root_key: &[u8]) -> Result { - let cache_size = self.inner().cache_size; + let storage_cache_policy = &self.inner().storage_cache_policy; let simple_store = self.inner().clone_with_root_key(root_key)?; - Ok(Self::from_inner(simple_store, cache_size)) + Ok(Self::from_inner(simple_store, storage_cache_policy)) } async fn list_all(config: &Self::Config) -> Result, DynamoDbStoreError> { @@ -1293,7 +1293,7 @@ impl TestKeyValueStore for DynamoDbStore { let common_config = CommonStoreConfig { max_concurrent_queries: Some(TEST_DYNAMO_DB_MAX_CONCURRENT_QUERIES), max_stream_queries: TEST_DYNAMO_DB_MAX_STREAM_QUERIES, - cache_size: TEST_CACHE_SIZE, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; let use_localstack = true; let config = get_config_internal(use_localstack).await?; @@ -1315,16 +1315,19 @@ impl DynamoDbStore { &self.store.store.store.store } - fn from_inner(simple_store: DynamoDbStoreInternal, cache_size: usize) -> DynamoDbStore { + fn from_inner( + simple_store: DynamoDbStoreInternal, + storage_cache_policy: &StorageCachePolicy, + ) -> DynamoDbStore { let store = JournalingKeyValueStore::new(simple_store); #[cfg(with_metrics)] let store = MeteredStore::new(&DYNAMO_DB_METRICS, store); let store = ValueSplittingStore::new(store); #[cfg(with_metrics)] let store = MeteredStore::new(&VALUE_SPLITTING_METRICS, store); - let store = LruCachingStore::new(store, cache_size); + let store = CachingStore::new(store, storage_cache_policy.clone()); #[cfg(with_metrics)] - let store = MeteredStore::new(&LRU_CACHING_METRICS, store); + let store = MeteredStore::new(&CACHED_DYNAMO_DB_METRICS, store); Self { store } } } diff --git a/linera-views/src/backends/indexed_db.rs b/linera-views/src/backends/indexed_db.rs index ef5ada421d11..a11b780ff796 100644 --- a/linera-views/src/backends/indexed_db.rs +++ b/linera-views/src/backends/indexed_db.rs @@ -12,6 +12,7 @@ use thiserror::Error; use crate::{ batch::{Batch, WriteOperation}, common::get_upper_bound_option, + lru_caching::DEFAULT_STORAGE_CACHE_POLICY, store::{ CommonStoreConfig, KeyValueStoreError, LocalAdminKeyValueStore, LocalReadableKeyValueStore, LocalWritableKeyValueStore, WithError, @@ -31,7 +32,7 @@ impl IndexedDbStoreConfig { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; Self { common_config } } diff --git a/linera-views/src/backends/lru_caching.rs b/linera-views/src/backends/lru_caching.rs index 69db6c1fb49c..d115ed237f8e 100644 --- a/linera-views/src/backends/lru_caching.rs +++ b/linera-views/src/backends/lru_caching.rs @@ -1,45 +1,305 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -//! Add LRU (least recently used) caching to a given store. +//! Add caching to a given store. +//! The current policy is based on the LRU (Least Recently Used). -/// The standard cache size used for tests. -pub const TEST_CACHE_SIZE: usize = 1000; +use std::{fs::File, io::BufReader}; + +/// The parametrization of the cache +#[derive(Clone, Debug, serde::Deserialize)] +pub struct StorageCachePolicy { + /// The maximum size of the cache, in bytes (keys size + value sizes) + pub max_cache_size: usize, + /// The maximum size of an entry size, in bytes + pub max_entry_size: usize, + /// The maximum number of entries in the cache. + pub max_cache_entries: usize, +} + +/// The maximum number of entries in the cache. +/// If the number of entries in the cache is too large then the underlying maps +/// become the limiting factor +pub const DEFAULT_STORAGE_CACHE_POLICY: StorageCachePolicy = StorageCachePolicy { + max_cache_size: 10000000, + max_entry_size: 1000000, + max_cache_entries: 1000, +}; + +/// Read the `StorageCachePolicy` from the existing file and if None, then the default +/// storage policy is returned +pub fn read_storage_cache_policy(storage_cache_policy: Option) -> StorageCachePolicy { + match storage_cache_policy { + None => DEFAULT_STORAGE_CACHE_POLICY, + Some(storage_cache_policy) => { + let file = File::open(storage_cache_policy) + .expect("File {storage_cache_policy} does not exist"); + let reader = BufReader::new(file); + serde_json::from_reader(reader).expect("The parsing of the StorageCachePolicy failed") + } + } +} #[cfg(with_metrics)] use std::sync::LazyLock; use std::{ - collections::{btree_map, hash_map::RandomState, BTreeMap}, + collections::{btree_map, hash_map::RandomState, BTreeMap, BTreeSet}, sync::{Arc, Mutex}, }; use linked_hash_map::LinkedHashMap; #[cfg(with_metrics)] -use {linera_base::prometheus_util, prometheus::IntCounterVec}; +use { + linera_base::prometheus_util, + prometheus::{HistogramVec, IntCounterVec}, +}; #[cfg(with_testing)] use crate::memory::MemoryStore; use crate::{ batch::{Batch, WriteOperation}, common::get_interval, - store::{ReadableKeyValueStore, RestrictedKeyValueStore, WithError, WritableKeyValueStore}, + store::{ + KeyIterable, KeyValueIterable, ReadableKeyValueStore, RestrictedKeyValueStore, WithError, + WritableKeyValueStore, + }, }; #[cfg(with_metrics)] -/// The total number of cache faults -static NUM_CACHE_FAULT: LazyLock = LazyLock::new(|| { - prometheus_util::register_int_counter_vec("num_cache_fault", "Number of cache faults", &[]) - .expect("Counter creation should not fail") +/// The total number of value cache faults +static NUM_CACHE_VALUE_FAULT: LazyLock = LazyLock::new(|| { + prometheus_util::register_int_counter_vec( + "num_cache_value_fault", + "Number of value cache faults", + &[], + ) + .expect("Counter creation should not fail") }); #[cfg(with_metrics)] /// The total number of cache successes -static NUM_CACHE_SUCCESS: LazyLock = LazyLock::new(|| { - prometheus_util::register_int_counter_vec("num_cache_success", "Number of cache success", &[]) - .expect("Counter creation should not fail") +static NUM_CACHE_VALUE_SUCCESS: LazyLock = LazyLock::new(|| { + prometheus_util::register_int_counter_vec( + "num_cache_value_success", + "Number of value cache success", + &[], + ) + .expect("Counter creation should not fail") +}); + +#[cfg(with_metrics)] +/// The total number of find cache faults +static NUM_CACHE_FIND_FAULT: LazyLock = LazyLock::new(|| { + prometheus_util::register_int_counter_vec( + "num_cache_find_fault", + "Number of find cache faults", + &[], + ) + .expect("Counter creation should not fail") }); -/// The `LruPrefixCache` stores the data for simple `read_values` queries +#[cfg(with_metrics)] +/// The total number of find cache successes +static NUM_CACHE_FIND_SUCCESS: LazyLock = LazyLock::new(|| { + prometheus_util::register_int_counter_vec( + "num_cache_find_success", + "Number of find cache success", + &[], + ) + .expect("Counter creation should not fail") +}); + +#[cfg(with_metrics)] +/// Size of the inserted value entry +static VALUE_CACHE_ENTRY_SIZE: LazyLock = LazyLock::new(|| { + prometheus_util::register_histogram_vec( + "value_cache_entry_size", + "Value cache entry size", + &[], + Some(vec![ + 10.0, 30.0, 100.0, 300.0, 1000.0, 3000.0, 10000.0, 30000.0, 100000.0, 300000.0, + 1000000.0, + ]), + ) + .expect("Histogram can be created") +}); + +#[cfg(with_metrics)] +/// Size of the inserted find entry +static FIND_CACHE_ENTRY_SIZE: LazyLock = LazyLock::new(|| { + prometheus_util::register_histogram_vec( + "find_cache_entry_size", + "Find cache entry size", + &[], + Some(vec![ + 10.0, 30.0, 100.0, 300.0, 1000.0, 3000.0, 10000.0, 30000.0, 100000.0, 300000.0, + 1000000.0, + ]), + ) + .expect("Histogram can be created") +}); + +#[derive(Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +enum CacheEntry { + Find, + Value, +} + +enum ValueCacheEntry { + DoesNotExist, + Exists, + Value(Vec), +} + +impl ValueCacheEntry { + fn from_contains_key(test: bool) -> ValueCacheEntry { + match test { + false => ValueCacheEntry::DoesNotExist, + true => ValueCacheEntry::Exists, + } + } + + fn from_read_value(result: &Option>) -> ValueCacheEntry { + match result { + None => ValueCacheEntry::DoesNotExist, + Some(vec) => ValueCacheEntry::Value(vec.to_vec()), + } + } + + fn get_contains_key(&self) -> bool { + match self { + ValueCacheEntry::DoesNotExist => false, + ValueCacheEntry::Exists => true, + ValueCacheEntry::Value(_) => true, + } + } + + fn get_read_value(&self) -> Option>> { + match self { + ValueCacheEntry::DoesNotExist => Some(None), + ValueCacheEntry::Exists => None, + ValueCacheEntry::Value(vec) => Some(Some(vec.clone())), + } + } + + fn size(&self) -> usize { + match self { + ValueCacheEntry::DoesNotExist => 0, + ValueCacheEntry::Exists => 0, + ValueCacheEntry::Value(vec) => vec.len(), + } + } +} + +enum FindCacheEntry { + Keys(BTreeSet>), + KeyValues(BTreeMap, Vec>), +} + +impl FindCacheEntry { + fn get_find_keys(&self, key_prefix: &[u8]) -> Vec> { + let key_prefix = key_prefix.to_vec(); + let delta = key_prefix.len(); + match self { + FindCacheEntry::Keys(map) => map + .range(get_interval(key_prefix)) + .map(|key| key[delta..].to_vec()) + .collect::>(), + FindCacheEntry::KeyValues(map) => map + .range(get_interval(key_prefix)) + .map(|(key, _)| key[delta..].to_vec()) + .collect::>(), + } + } + + fn get_find_key_values(&self, key_prefix: &[u8]) -> Option, Vec)>> { + match self { + FindCacheEntry::Keys(_) => None, + FindCacheEntry::KeyValues(map) => { + let key_prefix = key_prefix.to_vec(); + let delta = key_prefix.len(); + Some( + map.range(get_interval(key_prefix)) + .map(|(key, value)| (key[delta..].to_vec(), value.to_vec())) + .collect::>(), + ) + } + } + } + + fn get_contains_key(&self, key: &[u8]) -> bool { + match self { + FindCacheEntry::Keys(map) => map.contains(key), + FindCacheEntry::KeyValues(map) => map.contains_key(key), + } + } + + fn get_read_value(&self, key: &[u8]) -> Option>> { + match self { + FindCacheEntry::Keys(_map) => None, + FindCacheEntry::KeyValues(map) => Some(map.get(key).cloned()), + } + } + + fn update_cache_entry(&mut self, key: &[u8], new_value: Option>) { + match self { + FindCacheEntry::Keys(map) => { + match new_value { + None => map.remove(key), + Some(_) => map.insert(key.to_vec()), + }; + } + FindCacheEntry::KeyValues(map) => { + match new_value { + None => map.remove(key), + Some(new_value) => map.insert(key.to_vec(), new_value), + }; + } + } + } + + fn delete_prefix(&mut self, key_prefix: &[u8]) { + match self { + FindCacheEntry::Keys(map) => { + let keys = map + .range(get_interval(key_prefix.to_vec())) + .cloned() + .collect::>(); + for key in keys { + map.remove(&key); + } + } + FindCacheEntry::KeyValues(map) => { + let keys = map + .range(get_interval(key_prefix.to_vec())) + .map(|(key, _)| key.clone()) + .collect::>(); + for key in keys { + map.remove(&key); + } + } + } + } + + fn size(&self) -> usize { + let mut total_size = 0; + match self { + FindCacheEntry::Keys(map) => { + for key in map { + total_size += key.len(); + } + } + FindCacheEntry::KeyValues(map) => { + for (key, value) in map { + total_size += key.len() + value.len(); + } + } + } + total_size + } +} + +/// The `StoragePrefixCache` stores the data for simple `read_values` queries /// It is inspired by the crate `lru-cache`. /// /// We cannot apply this crate directly because the batch operation @@ -48,118 +308,402 @@ static NUM_CACHE_SUCCESS: LazyLock = LazyLock::new(|| { /// keep track of this. /// The data structures -struct LruPrefixCache { - map: BTreeMap, Option>>, - queue: LinkedHashMap, (), RandomState>, - max_cache_size: usize, +#[allow(clippy::type_complexity)] +struct StoragePrefixCache { + map_find: BTreeMap, FindCacheEntry>, + map_value: BTreeMap, ValueCacheEntry>, + queue: LinkedHashMap<(Vec, CacheEntry), usize, RandomState>, + storage_cache_policy: StorageCachePolicy, + total_size: usize, } -impl<'a> LruPrefixCache { +impl StoragePrefixCache { /// Creates a LruPrefixCache. - pub fn new(max_cache_size: usize) -> Self { + pub fn new(storage_cache_policy: StorageCachePolicy) -> Self { Self { - map: BTreeMap::new(), + map_find: BTreeMap::new(), + map_value: BTreeMap::new(), queue: LinkedHashMap::new(), - max_cache_size, + storage_cache_policy, + total_size: 0, + } + } + + fn get_lower_bound(&self, key: &[u8]) -> Option<(&Vec, &FindCacheEntry)> { + match self.map_find.range(..=key.to_vec()).next_back() { + None => None, + Some((key_store, value)) => { + if key.starts_with(key_store) { + Some((key_store, value)) + } else { + None + } + } + } + } + + fn get_lower_bound_update(&mut self, key: &[u8]) -> Option<(&Vec, &mut FindCacheEntry)> { + match self.map_find.range_mut(..=key.to_vec()).next_back() { + None => None, + Some((key_store, value)) => { + if key.starts_with(key_store) { + Some((key_store, value)) + } else { + None + } + } + } + } + + /// Inserts a new entry, clearing entries of the queue if needed + fn insert_queue(&mut self, full_key: (Vec, CacheEntry), cache_size: usize) { + self.queue.insert(full_key, cache_size); + self.total_size += cache_size; + while self.total_size > self.storage_cache_policy.max_cache_size + || self.queue.len() > self.storage_cache_policy.max_cache_entries + { + let Some(value) = self.queue.pop_front() else { + break; + }; + match value.0 { + (v, CacheEntry::Find) => { + self.map_find.remove(&v); + } + (v, CacheEntry::Value) => { + self.map_value.remove(&v); + } + } + self.total_size -= value.1; } } - /// Inserts an entry into the cache. - pub fn insert(&mut self, key: Vec, value: Option>) { - match self.map.entry(key.clone()) { + /// Removes an entry from the queue + fn remove_from_queue(&mut self, full_key: &(Vec, CacheEntry)) { + let existing_cache_size = self.queue.remove(full_key).unwrap(); + self.total_size -= existing_cache_size; + } + + /// Sets a size to zero in the queue + fn zero_set_entry_in_queue(&mut self, full_key: &(Vec, CacheEntry)) { + let len = full_key.0.len(); + let cache_size = self.queue.get_mut(full_key).unwrap(); + self.total_size -= *cache_size; + *cache_size = len; + self.total_size += *cache_size; + } + + /// Inserts a value entry into the cache. + pub fn insert_value(&mut self, key: Vec, cache_entry: ValueCacheEntry) { + let cache_size = cache_entry.size() + key.len(); + #[cfg(with_metrics)] + VALUE_CACHE_ENTRY_SIZE + .with_label_values(&[]) + .observe(cache_size as f64); + if cache_size > self.storage_cache_policy.max_entry_size { + return; + } + let full_key = (key.clone(), CacheEntry::Value); + match self.map_value.entry(key) { btree_map::Entry::Occupied(mut entry) => { - entry.insert(value); - // Put it on first position for LRU - self.queue.remove(&key); - self.queue.insert(key, ()); + entry.insert(cache_entry); + self.remove_from_queue(&full_key); } btree_map::Entry::Vacant(entry) => { - entry.insert(value); - self.queue.insert(key, ()); - if self.queue.len() > self.max_cache_size { - let Some(value) = self.queue.pop_front() else { - unreachable!() - }; - self.map.remove(&value.0); - } + entry.insert(cache_entry); + } + } + self.insert_queue(full_key, cache_size); + } + + /// Invalidates corresponding find entries + pub fn correct_find_entry(&mut self, key: &[u8], new_value: Option>) { + let lower_bound = self.get_lower_bound_update(key); + if let Some((lower_bound, cache_entry)) = lower_bound { + let key_red = &key[lower_bound.len()..]; + cache_entry.update_cache_entry(key_red, new_value); + } + } + + /// Inserts a read_value entry into the cache. + pub fn insert_read_value(&mut self, key: Vec, value: &Option>) { + let cache_entry = ValueCacheEntry::from_read_value(value); + self.insert_value(key, cache_entry) + } + + /// Inserts a read_value entry into the cache. + pub fn insert_contains_key(&mut self, key: Vec, result: bool) { + let cache_entry = ValueCacheEntry::from_contains_key(result); + self.insert_value(key, cache_entry) + } + + /// Inserts a find entry into the cache. + pub fn insert_find(&mut self, key_prefix: Vec, cache_entry: FindCacheEntry) { + let cache_size = cache_entry.size() + key_prefix.len(); + #[cfg(with_metrics)] + FIND_CACHE_ENTRY_SIZE + .with_label_values(&[]) + .observe(cache_size as f64); + if cache_size > self.storage_cache_policy.max_cache_size { + // Inserting that entry would lead to complete clearing of the cache + // which is counter productive + return; + } + let keys = self + .map_find + .range(get_interval(key_prefix.clone())) + .map(|(x, _)| x.clone()) + .collect::>(); + for key in keys { + self.map_find.remove(&key); + let full_key = (key, CacheEntry::Find); + self.remove_from_queue(&full_key); + } + if let FindCacheEntry::KeyValues(_) = cache_entry { + let keys = self + .map_value + .range(get_interval(key_prefix.clone())) + .map(|(x, _)| x.clone()) + .collect::>(); + for key in keys { + self.map_value.remove(&key); + let full_key = (key, CacheEntry::Value); + self.remove_from_queue(&full_key); + } + } + let full_prefix = (key_prefix.clone(), CacheEntry::Find); + match self.map_find.entry(key_prefix.clone()) { + btree_map::Entry::Occupied(mut entry) => { + entry.insert(cache_entry); + self.remove_from_queue(&full_prefix); + } + btree_map::Entry::Vacant(entry) => { + entry.insert(cache_entry); } } + self.insert_queue(full_prefix, cache_size); + } + + /// Inserts a find entry into the cache. + pub fn insert_find_keys(&mut self, key_prefix: Vec, value: &[Vec]) { + let map = value.iter().cloned().collect::>(); + let cache_entry = FindCacheEntry::Keys(map); + self.insert_find(key_prefix, cache_entry); + } + + /// Inserts a find_key_values entry into the cache. + pub fn insert_find_key_values(&mut self, key_prefix: Vec, value: &[(Vec, Vec)]) { + let map = value.iter().cloned().collect::>(); + let cache_entry = FindCacheEntry::KeyValues(map); + self.insert_find(key_prefix, cache_entry); } /// Marks cached keys that match the prefix as deleted. Importantly, this does not create new entries in the cache. pub fn delete_prefix(&mut self, key_prefix: &[u8]) { - for (_, value) in self.map.range_mut(get_interval(key_prefix.to_vec())) { - *value = None; + let mut cache_entries = Vec::new(); + for (key, value) in self.map_value.range_mut(get_interval(key_prefix.to_vec())) { + *value = ValueCacheEntry::DoesNotExist; + cache_entries.push((key.clone(), CacheEntry::Value)); + } + for (key, value) in self.map_find.range_mut(get_interval(key_prefix.to_vec())) { + *value = FindCacheEntry::KeyValues(BTreeMap::new()); + cache_entries.push((key.clone(), CacheEntry::Find)); + } + for cache_entry in cache_entries { + self.zero_set_entry_in_queue(&cache_entry); + } + let lower_bound = self.get_lower_bound_update(key_prefix); + let result = if let Some((lower_bound, cache_entry)) = lower_bound { + let key_prefix_red = &key_prefix[lower_bound.len()..]; + cache_entry.delete_prefix(key_prefix_red); + let new_cache_size = cache_entry.size() + key_prefix.len(); + Some((new_cache_size, lower_bound.clone())) + } else { + None + }; + if let Some((new_cache_size, lower_bound)) = result { + let full_prefix = (lower_bound.clone(), CacheEntry::Find); + let existing_cache_size = self.queue.get_mut(&full_prefix).unwrap(); + self.total_size -= *existing_cache_size; + *existing_cache_size = new_cache_size; + self.total_size += new_cache_size; + } + } + + /// Gets the read_value entry from the key. + pub fn query_read_value(&mut self, key: &[u8]) -> Option>> { + let result = match self.map_value.get(key) { + None => None, + Some(entry) => entry.get_read_value(), + }; + if let Some(result) = result { + let full_key = (key.to_vec(), CacheEntry::Value); + let cache_size = self.queue.remove(&full_key).unwrap(); + self.queue.insert(full_key, cache_size); + return Some(result); + } + let lower_bound = self.get_lower_bound(key); + let (lower_bound, result) = if let Some((lower_bound, cache_entry)) = lower_bound { + let key_red = &key[lower_bound.len()..]; + (Some(lower_bound), cache_entry.get_read_value(key_red)) + } else { + (None, None) + }; + if result.is_some() { + if let Some(lower_bound) = lower_bound { + let full_key = (lower_bound.clone(), CacheEntry::Find); + let cache_size = self.queue.remove(&full_key).unwrap(); + self.queue.insert(full_key, cache_size); + } + } + result + } + + /// Gets the contains_key entry from the key. + pub fn query_contains_key(&mut self, key: &[u8]) -> Option { + let result = self + .map_value + .get(key) + .map(|entry| entry.get_contains_key()); + if let Some(result) = result { + let full_key = (key.to_vec(), CacheEntry::Value); + let cache_size = self.queue.remove(&full_key).unwrap(); + self.queue.insert(full_key, cache_size); + return Some(result); + } + let lower_bound = self.get_lower_bound(key); + let (lower_bound, result) = if let Some((lower_bound, cache_entry)) = lower_bound { + let key_red = &key[lower_bound.len()..]; + ( + Some(lower_bound), + Some(cache_entry.get_contains_key(key_red)), + ) + } else { + (None, None) + }; + if result.is_some() { + if let Some(lower_bound) = lower_bound { + let full_key = (lower_bound.clone(), CacheEntry::Find); + let cache_size = self.queue.remove(&full_key).unwrap(); + self.queue.insert(full_key, cache_size); + } } + result } - /// Gets the entry from the key. - pub fn query(&'a self, key: &'a [u8]) -> Option<&'a Option>> { - self.map.get(key) + /// Gets the find_keys entry from the key prefix + pub fn query_find_keys(&mut self, key_prefix: &[u8]) -> Option>> { + let (lower_bound, result) = match self.get_lower_bound(key_prefix) { + None => (None, None), + Some((lower_bound, cache_entry)) => { + let key_prefix_red = &key_prefix[lower_bound.len()..]; + ( + Some(lower_bound), + Some(cache_entry.get_find_keys(key_prefix_red)), + ) + } + }; + if let Some(lower_bound) = lower_bound { + let full_key = (lower_bound.clone(), CacheEntry::Find); + let cache_size = self.queue.remove(&full_key).unwrap(); + self.queue.insert(full_key, cache_size); + } + result + } + + /// Gets the find key values entry from the key prefix + pub fn query_find_key_values(&mut self, key_prefix: &[u8]) -> Option, Vec)>> { + let (lower_bound, result) = match self.get_lower_bound(key_prefix) { + None => (None, None), + Some((lower_bound, cache_entry)) => { + let key_prefix_red = &key_prefix[lower_bound.len()..]; + ( + Some(lower_bound), + cache_entry.get_find_key_values(key_prefix_red), + ) + } + }; + if result.is_some() { + if let Some(lower_bound) = lower_bound { + let full_key = (lower_bound.clone(), CacheEntry::Find); + let cache_size = self.queue.remove(&full_key).unwrap(); + self.queue.insert(full_key, cache_size); + } + } + result } } -/// We take a store, a maximum size and build a LRU-based system. +/// We take a store, a maximum size and build a cache based system. #[derive(Clone)] -pub struct LruCachingStore { - /// The inner store that is called by the LRU cache one +pub struct CachingStore { + /// The inner store that is called by the cached one pub store: K, - lru_read_values: Option>>, + cache: Option>>, } -impl WithError for LruCachingStore +impl WithError for CachingStore where K: WithError, { type Error = K::Error; } -impl ReadableKeyValueStore for LruCachingStore +impl ReadableKeyValueStore for CachingStore where K: ReadableKeyValueStore + Send + Sync, { - // The LRU cache does not change the underlying store's size limits. + // The cache does not change the underlying store's size limits. const MAX_KEY_SIZE: usize = K::MAX_KEY_SIZE; - type Keys = K::Keys; - type KeyValues = K::KeyValues; + type Keys = Vec>; + type KeyValues = Vec<(Vec, Vec)>; fn max_stream_queries(&self) -> usize { self.store.max_stream_queries() } async fn read_value_bytes(&self, key: &[u8]) -> Result>, Self::Error> { - let Some(lru_read_values) = &self.lru_read_values else { + let Some(cache) = &self.cache else { return self.store.read_value_bytes(key).await; }; - // First inquiring in the read_value_bytes LRU + // First inquiring in the read_value_bytes cache { - let lru_read_values_container = lru_read_values.lock().unwrap(); - if let Some(value) = lru_read_values_container.query(key) { + let mut cache = cache.lock().unwrap(); + if let Some(value) = cache.query_read_value(key) { #[cfg(with_metrics)] - NUM_CACHE_SUCCESS.with_label_values(&[]).inc(); - return Ok(value.clone()); + NUM_CACHE_VALUE_SUCCESS.with_label_values(&[]).inc(); + return Ok(value); } } #[cfg(with_metrics)] - NUM_CACHE_FAULT.with_label_values(&[]).inc(); + NUM_CACHE_VALUE_FAULT.with_label_values(&[]).inc(); let value = self.store.read_value_bytes(key).await?; - let mut lru_read_values = lru_read_values.lock().unwrap(); - lru_read_values.insert(key.to_vec(), value.clone()); + let mut cache = cache.lock().unwrap(); + cache.insert_read_value(key.to_vec(), &value); Ok(value) } async fn contains_key(&self, key: &[u8]) -> Result { - if let Some(values) = &self.lru_read_values { - let values = values.lock().unwrap(); - if let Some(value) = values.query(key) { - return Ok(value.is_some()); + let Some(cache) = &self.cache else { + return self.store.contains_key(key).await; + }; + { + let mut cache = cache.lock().unwrap(); + if let Some(result) = cache.query_contains_key(key) { + #[cfg(with_metrics)] + NUM_CACHE_VALUE_SUCCESS.with_label_values(&[]).inc(); + return Ok(result); } } - self.store.contains_key(key).await + #[cfg(with_metrics)] + NUM_CACHE_VALUE_FAULT.with_label_values(&[]).inc(); + let result = self.store.contains_key(key).await?; + let mut cache = cache.lock().unwrap(); + cache.insert_contains_key(key.to_vec(), result); + Ok(result) } async fn contains_keys(&self, keys: Vec>) -> Result, Self::Error> { - let Some(values) = &self.lru_read_values else { + let Some(cache) = &self.cache else { return self.store.contains_keys(keys).await; }; let size = keys.len(); @@ -167,19 +711,25 @@ where let mut indices = Vec::new(); let mut key_requests = Vec::new(); { - let values = values.lock().unwrap(); + let mut cache = cache.lock().unwrap(); for i in 0..size { - if let Some(value) = values.query(&keys[i]) { - results[i] = value.is_some(); + if let Some(result) = cache.query_contains_key(&keys[i]) { + #[cfg(with_metrics)] + NUM_CACHE_VALUE_SUCCESS.with_label_values(&[]).inc(); + results[i] = result; } else { + #[cfg(with_metrics)] + NUM_CACHE_VALUE_FAULT.with_label_values(&[]).inc(); indices.push(i); key_requests.push(keys[i].clone()); } } } - let key_results = self.store.contains_keys(key_requests).await?; - for (index, result) in indices.into_iter().zip(key_results) { + let key_results = self.store.contains_keys(key_requests.clone()).await?; + let mut cache = cache.lock().unwrap(); + for ((index, result), key) in indices.into_iter().zip(key_results).zip(key_requests) { results[index] = result; + cache.insert_contains_key(key, result); } Ok(results) } @@ -188,7 +738,7 @@ where &self, keys: Vec>, ) -> Result>>, Self::Error> { - let Some(lru_read_values) = &self.lru_read_values else { + let Some(cache) = &self.cache else { return self.store.read_multi_values_bytes(keys).await; }; @@ -196,15 +746,15 @@ where let mut cache_miss_indices = Vec::new(); let mut miss_keys = Vec::new(); { - let lru_read_values_container = lru_read_values.lock().unwrap(); + let mut cache = cache.lock().unwrap(); for (i, key) in keys.into_iter().enumerate() { - if let Some(value) = lru_read_values_container.query(&key) { + if let Some(value) = cache.query_read_value(&key) { #[cfg(with_metrics)] - NUM_CACHE_SUCCESS.with_label_values(&[]).inc(); - result.push(value.clone()); + NUM_CACHE_VALUE_SUCCESS.with_label_values(&[]).inc(); + result.push(value); } else { #[cfg(with_metrics)] - NUM_CACHE_FAULT.with_label_values(&[]).inc(); + NUM_CACHE_VALUE_FAULT.with_label_values(&[]).inc(); result.push(None); cache_miss_indices.push(i); miss_keys.push(key); @@ -215,53 +765,88 @@ where .store .read_multi_values_bytes(miss_keys.clone()) .await?; - let mut lru_read_values = lru_read_values.lock().unwrap(); + let mut cache = cache.lock().unwrap(); for (i, (key, value)) in cache_miss_indices .into_iter() .zip(miss_keys.into_iter().zip(values)) { - lru_read_values.insert(key, value.clone()); + cache.insert_read_value(key, &value); result[i] = value; } Ok(result) } - async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result { - self.store.find_keys_by_prefix(key_prefix).await + async fn find_keys_by_prefix(&self, key_prefix: &[u8]) -> Result>, Self::Error> { + let Some(cache) = &self.cache else { + return self.uncached_find_keys_by_prefix(key_prefix).await; + }; + { + let mut cache = cache.lock().unwrap(); + if let Some(value) = cache.query_find_keys(key_prefix) { + #[cfg(with_metrics)] + NUM_CACHE_FIND_SUCCESS.with_label_values(&[]).inc(); + return Ok(value); + } + } + #[cfg(with_metrics)] + NUM_CACHE_FIND_FAULT.with_label_values(&[]).inc(); + let keys = self.uncached_find_keys_by_prefix(key_prefix).await?; + let mut cache = cache.lock().unwrap(); + cache.insert_find_keys(key_prefix.to_vec(), &keys); + Ok(keys) } async fn find_key_values_by_prefix( &self, key_prefix: &[u8], ) -> Result { - self.store.find_key_values_by_prefix(key_prefix).await + let Some(cache) = &self.cache else { + return self.uncached_find_key_values_by_prefix(key_prefix).await; + }; + { + let mut cache = cache.lock().unwrap(); + if let Some(value) = cache.query_find_key_values(key_prefix) { + #[cfg(with_metrics)] + NUM_CACHE_FIND_SUCCESS.with_label_values(&[]).inc(); + return Ok(value); + } + } + #[cfg(with_metrics)] + NUM_CACHE_FIND_FAULT.with_label_values(&[]).inc(); + let key_values = self.uncached_find_key_values_by_prefix(key_prefix).await?; + let mut cache = cache.lock().unwrap(); + cache.insert_find_key_values(key_prefix.to_vec(), &key_values); + Ok(key_values) } } -impl WritableKeyValueStore for LruCachingStore +impl WritableKeyValueStore for CachingStore where K: WritableKeyValueStore + Send + Sync, { - // The LRU cache does not change the underlying store's size limits. + // The cache does not change the underlying store's size limits. const MAX_VALUE_SIZE: usize = K::MAX_VALUE_SIZE; async fn write_batch(&self, batch: Batch) -> Result<(), Self::Error> { - let Some(lru_read_values) = &self.lru_read_values else { + let Some(cache) = &self.cache else { return self.store.write_batch(batch).await; }; - { - let mut lru_read_values = lru_read_values.lock().unwrap(); + let mut cache = cache.lock().unwrap(); for operation in &batch.operations { match operation { WriteOperation::Put { key, value } => { - lru_read_values.insert(key.to_vec(), Some(value.to_vec())); + cache.correct_find_entry(key, Some(value.to_vec())); + let cache_entry = ValueCacheEntry::Value(value.to_vec()); + cache.insert_value(key.to_vec(), cache_entry); } WriteOperation::Delete { key } => { - lru_read_values.insert(key.to_vec(), None); + cache.correct_find_entry(key, None); + let cache_entry = ValueCacheEntry::DoesNotExist; + cache.insert_value(key.to_vec(), cache_entry); } WriteOperation::DeletePrefix { key_prefix } => { - lru_read_values.delete_prefix(key_prefix); + cache.delete_prefix(key_prefix); } } } @@ -274,28 +859,59 @@ where } } -fn new_lru_prefix_cache(cache_size: usize) -> Option>> { - if cache_size == 0 { +fn new_storage_prefix_cache( + storage_cache_policy: StorageCachePolicy, +) -> Option>> { + if storage_cache_policy.max_cache_size == 0 { None } else { - Some(Arc::new(Mutex::new(LruPrefixCache::new(cache_size)))) + let cache = StoragePrefixCache::new(storage_cache_policy); + Some(Arc::new(Mutex::new(cache))) } } -impl LruCachingStore +impl CachingStore where - K: RestrictedKeyValueStore, + K: RestrictedKeyValueStore + WithError, { - /// Creates a new key-value store that provides LRU caching at top of the given store. - pub fn new(store: K, cache_size: usize) -> Self { - let lru_read_values = new_lru_prefix_cache(cache_size); - Self { - store, - lru_read_values, + /// Creates a new key-value store that provides caching at top of the given store. + pub fn new(store: K, storage_cache_policy: StorageCachePolicy) -> Self { + let cache = new_storage_prefix_cache(storage_cache_policy); + Self { store, cache } + } +} + +impl CachingStore +where + K: ReadableKeyValueStore + WithError, +{ + async fn uncached_find_keys_by_prefix( + &self, + key_prefix: &[u8], + ) -> Result>, K::Error> { + let keys = self.store.find_keys_by_prefix(key_prefix).await?; + let mut keys_vec = Vec::new(); + for key in keys.iterator() { + keys_vec.push(key?.to_vec()); + } + Ok(keys_vec) + } + + async fn uncached_find_key_values_by_prefix( + &self, + key_prefix: &[u8], + ) -> Result, Vec)>, K::Error> { + let key_values = self.store.find_key_values_by_prefix(key_prefix).await?; + let mut key_values_vec = Vec::new(); + for key_value in key_values.iterator() { + let key_value = key_value?; + let key_value = (key_value.0.to_vec(), key_value.1.to_vec()); + key_values_vec.push(key_value); } + Ok(key_values_vec) } } /// A memory store with caching. #[cfg(with_testing)] -pub type LruCachingMemoryStore = LruCachingStore; +pub type CachingMemoryStore = CachingStore; diff --git a/linera-views/src/backends/memory.rs b/linera-views/src/backends/memory.rs index 52403485c1b1..0ef3c4293f49 100644 --- a/linera-views/src/backends/memory.rs +++ b/linera-views/src/backends/memory.rs @@ -18,6 +18,7 @@ use crate::store::TestKeyValueStore; use crate::{ batch::{Batch, WriteOperation}, common::get_interval, + lru_caching::DEFAULT_STORAGE_CACHE_POLICY, store::{ AdminKeyValueStore, CommonStoreConfig, KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore, @@ -37,7 +38,7 @@ impl MemoryStoreConfig { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; Self { common_config } } @@ -280,7 +281,7 @@ impl MemoryStore { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; let config = MemoryStoreConfig { common_config }; let kill_on_drop = false; @@ -297,7 +298,7 @@ impl MemoryStore { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; let config = MemoryStoreConfig { common_config }; let kill_on_drop = true; @@ -325,7 +326,7 @@ impl AdminKeyValueStore for MemoryStore { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; let config = MemoryStoreConfig { common_config }; let mut memory_stores = MEMORY_STORES @@ -374,7 +375,7 @@ impl TestKeyValueStore for MemoryStore { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; Ok(MemoryStoreConfig { common_config }) } diff --git a/linera-views/src/backends/metering.rs b/linera-views/src/backends/metering.rs index df2b1d51611b..a68b9c793356 100644 --- a/linera-views/src/backends/metering.rs +++ b/linera-views/src/backends/metering.rs @@ -67,10 +67,20 @@ pub(crate) static SCYLLA_DB_METRICS: LazyLock = pub(crate) static VALUE_SPLITTING_METRICS: LazyLock = LazyLock::new(|| KeyValueStoreMetrics::new("value splitting".to_string())); -/// The metrics for the "lru caching" -#[cfg(any(with_rocksdb, with_dynamodb, with_scylladb))] -pub(crate) static LRU_CACHING_METRICS: LazyLock = - LazyLock::new(|| KeyValueStoreMetrics::new("lru caching".to_string())); +/// The metrics for the "cached rocks db" +#[cfg(with_rocksdb)] +pub(crate) static CACHED_ROCKS_DB_METRICS: LazyLock = + LazyLock::new(|| KeyValueStoreMetrics::new("cached rocks db".to_string())); + +/// The metrics for the "cached dynamo db" +#[cfg(with_dynamodb)] +pub(crate) static CACHED_DYNAMO_DB_METRICS: LazyLock = + LazyLock::new(|| KeyValueStoreMetrics::new("cached dynamo db".to_string())); + +/// The metrics for the "cached scylla db" +#[cfg(with_scylladb)] +pub(crate) static CACHED_SCYLLA_DB_METRICS: LazyLock = + LazyLock::new(|| KeyValueStoreMetrics::new("cached scylla db".to_string())); impl KeyValueStoreMetrics { /// Creation of a named Metered counter. diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index 080951cd75e4..c02d953c8401 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -16,12 +16,12 @@ use thiserror::Error; #[cfg(with_metrics)] use crate::metering::{ - MeteredStore, LRU_CACHING_METRICS, ROCKS_DB_METRICS, VALUE_SPLITTING_METRICS, + MeteredStore, CACHED_ROCKS_DB_METRICS, ROCKS_DB_METRICS, VALUE_SPLITTING_METRICS, }; use crate::{ batch::{Batch, WriteOperation}, common::get_upper_bound, - lru_caching::LruCachingStore, + lru_caching::{CachingStore, StorageCachePolicy}, store::{ AdminKeyValueStore, CommonStoreConfig, KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore, @@ -29,7 +29,7 @@ use crate::{ value_splitting::{ValueSplittingError, ValueSplittingStore}, }; #[cfg(with_testing)] -use crate::{lru_caching::TEST_CACHE_SIZE, store::TestKeyValueStore}; +use crate::{lru_caching::DEFAULT_STORAGE_CACHE_POLICY, store::TestKeyValueStore}; /// The number of streams for the test #[cfg(with_testing)] @@ -60,6 +60,58 @@ pub enum RocksDbSpawnMode { BlockInPlace, } +/// The inner client +#[derive(Clone)] +struct RocksDbStoreInternal { + executor: RocksDbStoreExecutor, + _path_with_guard: PathWithGuard, + max_stream_queries: usize, + storage_cache_policy: StorageCachePolicy, + spawn_mode: RocksDbSpawnMode, +} + +impl RocksDbStoreInternal { + fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> { + if !namespace + .chars() + .all(|character| character.is_ascii_alphanumeric() || character == '_') + { + return Err(RocksDbStoreInternalError::InvalidNamespace); + } + Ok(()) + } + + fn build( + path_with_guard: PathWithGuard, + spawn_mode: RocksDbSpawnMode, + max_stream_queries: usize, + storage_cache_policy: &StorageCachePolicy, + root_key: &[u8], + ) -> Result { + let path = path_with_guard.path_buf.clone(); + if !std::path::Path::exists(&path) { + std::fs::create_dir(path.clone())?; + } + let mut options = rocksdb::Options::default(); + options.create_if_missing(true); + let db = DB::open(&options, path)?; + let root_key = root_key.to_vec(); + let root_key = root_key.to_vec(); + let executor = RocksDbStoreExecutor { + db: Arc::new(db), + root_key, + }; + let storage_cache_policy = storage_cache_policy.clone(); + Ok(RocksDbStoreInternal { + executor, + _path_with_guard: path_with_guard, + max_stream_queries, + storage_cache_policy, + spawn_mode, + }) + } +} + impl RocksDbSpawnMode { /// Obtains the spawning mode from runtime. pub fn get_spawn_mode_from_runtime() -> Self { @@ -262,16 +314,6 @@ impl RocksDbStoreExecutor { } } -/// The inner client -#[derive(Clone)] -pub struct RocksDbStoreInternal { - executor: RocksDbStoreExecutor, - _path_with_guard: PathWithGuard, - max_stream_queries: usize, - cache_size: usize, - spawn_mode: RocksDbSpawnMode, -} - /// The initial configuration of the system #[derive(Clone, Debug)] pub struct RocksDbStoreConfig { @@ -283,46 +325,6 @@ pub struct RocksDbStoreConfig { pub common_config: CommonStoreConfig, } -impl RocksDbStoreInternal { - fn check_namespace(namespace: &str) -> Result<(), RocksDbStoreInternalError> { - if !namespace - .chars() - .all(|character| character.is_ascii_alphanumeric() || character == '_') - { - return Err(RocksDbStoreInternalError::InvalidNamespace); - } - Ok(()) - } - - fn build( - path_with_guard: PathWithGuard, - spawn_mode: RocksDbSpawnMode, - max_stream_queries: usize, - cache_size: usize, - root_key: &[u8], - ) -> Result { - let path = path_with_guard.path_buf.clone(); - if !std::path::Path::exists(&path) { - std::fs::create_dir(path.clone())?; - } - let mut options = rocksdb::Options::default(); - options.create_if_missing(true); - let db = DB::open(&options, path)?; - let root_key = root_key.to_vec(); - let executor = RocksDbStoreExecutor { - db: Arc::new(db), - root_key, - }; - Ok(RocksDbStoreInternal { - executor, - _path_with_guard: path_with_guard, - max_stream_queries, - cache_size, - spawn_mode, - }) - } -} - impl WithError for RocksDbStoreInternal { type Error = RocksDbStoreInternalError; } @@ -451,13 +453,13 @@ impl AdminKeyValueStore for RocksDbStoreInternal { path_buf.push(namespace); path_with_guard.path_buf = path_buf; let max_stream_queries = config.common_config.max_stream_queries; - let cache_size = config.common_config.cache_size; + let storage_cache_policy = &config.common_config.storage_cache_policy; let spawn_mode = config.spawn_mode; RocksDbStoreInternal::build( path_with_guard, spawn_mode, max_stream_queries, - cache_size, + storage_cache_policy, root_key, ) } @@ -539,7 +541,7 @@ impl TestKeyValueStore for RocksDbStoreInternal { let common_config = CommonStoreConfig { max_concurrent_queries: None, max_stream_queries: TEST_ROCKS_DB_MAX_STREAM_QUERIES, - cache_size: TEST_CACHE_SIZE, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; let spawn_mode = RocksDbSpawnMode::get_spawn_mode_from_runtime(); Ok(RocksDbStoreConfig { @@ -598,15 +600,15 @@ impl KeyValueStoreError for RocksDbStoreInternalError { const BACKEND: &'static str = "rocks_db"; } -/// A shared DB client for RocksDB implementing LruCaching +/// A shared DB client for RocksDB implementing caching #[derive(Clone)] pub struct RocksDbStore { #[cfg(with_metrics)] store: MeteredStore< - LruCachingStore>>>, + CachingStore>>>, >, #[cfg(not(with_metrics))] - store: LruCachingStore>, + store: CachingStore>, } /// A path and the guard for the temporary directory if needed @@ -648,15 +650,18 @@ impl RocksDbStore { &self.store.store.store } - fn from_inner(store: RocksDbStoreInternal, cache_size: usize) -> RocksDbStore { + fn from_inner( + store: RocksDbStoreInternal, + storage_cache_policy: &StorageCachePolicy, + ) -> RocksDbStore { #[cfg(with_metrics)] let store = MeteredStore::new(&ROCKS_DB_METRICS, store); let store = ValueSplittingStore::new(store); #[cfg(with_metrics)] let store = MeteredStore::new(&VALUE_SPLITTING_METRICS, store); - let store = LruCachingStore::new(store, cache_size); + let store = CachingStore::new(store, storage_cache_policy.clone()); #[cfg(with_metrics)] - let store = MeteredStore::new(&LRU_CACHING_METRICS, store); + let store = MeteredStore::new(&CACHED_ROCKS_DB_METRICS, store); Self { store } } } @@ -732,14 +737,14 @@ impl AdminKeyValueStore for RocksDbStore { root_key: &[u8], ) -> Result { let store = RocksDbStoreInternal::connect(config, namespace, root_key).await?; - let cache_size = config.common_config.cache_size; - Ok(Self::from_inner(store, cache_size)) + let storage_cache_policy = &config.common_config.storage_cache_policy; + Ok(Self::from_inner(store, storage_cache_policy)) } fn clone_with_root_key(&self, root_key: &[u8]) -> Result { let store = self.inner().clone_with_root_key(root_key)?; - let cache_size = self.inner().cache_size; - Ok(Self::from_inner(store, cache_size)) + let storage_cache_policy = &self.inner().storage_cache_policy; + Ok(Self::from_inner(store, storage_cache_policy)) } async fn list_all(config: &Self::Config) -> Result, RocksDbStoreError> { diff --git a/linera-views/src/backends/scylla_db.rs b/linera-views/src/backends/scylla_db.rs index 378047cf38e1..799a85842677 100644 --- a/linera-views/src/backends/scylla_db.rs +++ b/linera-views/src/backends/scylla_db.rs @@ -30,19 +30,19 @@ use scylla::{ use thiserror::Error; #[cfg(with_metrics)] -use crate::metering::{MeteredStore, LRU_CACHING_METRICS, SCYLLA_DB_METRICS}; +use crate::metering::{MeteredStore, CACHED_SCYLLA_DB_METRICS, SCYLLA_DB_METRICS}; use crate::{ batch::{Batch, UnorderedBatch}, common::get_upper_bound_option, journaling::{DirectWritableKeyValueStore, JournalConsistencyError, JournalingKeyValueStore}, - lru_caching::LruCachingStore, + lru_caching::{CachingStore, StorageCachePolicy}, store::{ AdminKeyValueStore, CommonStoreConfig, KeyValueStoreError, ReadableKeyValueStore, WithError, WritableKeyValueStore, }, }; #[cfg(with_testing)] -use crate::{lru_caching::TEST_CACHE_SIZE, store::TestKeyValueStore}; +use crate::{lru_caching::DEFAULT_STORAGE_CACHE_POLICY, store::TestKeyValueStore}; /// The client for ScyllaDb. /// * The session allows to pass queries @@ -386,7 +386,7 @@ pub struct ScyllaDbStoreInternal { store: Arc, semaphore: Option>, max_stream_queries: usize, - cache_size: usize, + storage_cache_policy: StorageCachePolicy, root_key: Vec, } @@ -575,13 +575,13 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { .max_concurrent_queries .map(|n| Arc::new(Semaphore::new(n))); let max_stream_queries = config.common_config.max_stream_queries; - let cache_size = config.common_config.cache_size; + let storage_cache_policy = config.common_config.storage_cache_policy.clone(); let root_key = get_big_root_key(root_key); Ok(Self { store, semaphore, max_stream_queries, - cache_size, + storage_cache_policy, root_key, }) } @@ -590,13 +590,13 @@ impl AdminKeyValueStore for ScyllaDbStoreInternal { let store = self.store.clone(); let semaphore = self.semaphore.clone(); let max_stream_queries = self.max_stream_queries; - let cache_size = self.cache_size; + let storage_cache_policy = self.storage_cache_policy.clone(); let root_key = get_big_root_key(root_key); Ok(Self { store, semaphore, max_stream_queries, - cache_size, + storage_cache_policy, root_key, }) } @@ -769,10 +769,9 @@ impl ScyllaDbStoreInternal { #[derive(Clone)] pub struct ScyllaDbStore { #[cfg(with_metrics)] - store: - MeteredStore>>>, + store: MeteredStore>>>, #[cfg(not(with_metrics))] - store: LruCachingStore>, + store: CachingStore>, } /// The type for building a new ScyllaDB Key Value Store @@ -856,15 +855,21 @@ impl AdminKeyValueStore for ScyllaDbStore { namespace: &str, root_key: &[u8], ) -> Result { - let cache_size = config.common_config.cache_size; + let storage_cache_policy = &config.common_config.storage_cache_policy; let simple_store = ScyllaDbStoreInternal::connect(config, namespace, root_key).await?; - Ok(ScyllaDbStore::from_inner(simple_store, cache_size)) + Ok(ScyllaDbStore::from_inner( + simple_store, + storage_cache_policy, + )) } fn clone_with_root_key(&self, root_key: &[u8]) -> Result { let simple_store = self.inner().clone_with_root_key(root_key)?; - let cache_size = self.inner().cache_size; - Ok(ScyllaDbStore::from_inner(simple_store, cache_size)) + let storage_cache_policy = &self.inner().storage_cache_policy; + Ok(ScyllaDbStore::from_inner( + simple_store, + storage_cache_policy, + )) } async fn list_all(config: &Self::Config) -> Result, ScyllaDbStoreError> { @@ -895,7 +900,7 @@ impl TestKeyValueStore for ScyllaDbStore { let common_config = CommonStoreConfig { max_concurrent_queries: Some(TEST_SCYLLA_DB_MAX_CONCURRENT_QUERIES), max_stream_queries: TEST_SCYLLA_DB_MAX_STREAM_QUERIES, - cache_size: TEST_CACHE_SIZE, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, }; Ok(ScyllaDbStoreConfig { uri, common_config }) } @@ -912,13 +917,16 @@ impl ScyllaDbStore { &self.store.store.store } - fn from_inner(simple_store: ScyllaDbStoreInternal, cache_size: usize) -> ScyllaDbStore { + fn from_inner( + simple_store: ScyllaDbStoreInternal, + storage_cache_policy: &StorageCachePolicy, + ) -> ScyllaDbStore { let store = JournalingKeyValueStore::new(simple_store); #[cfg(with_metrics)] let store = MeteredStore::new(&SCYLLA_DB_METRICS, store); - let store = LruCachingStore::new(store, cache_size); + let store = CachingStore::new(store, storage_cache_policy.clone()); #[cfg(with_metrics)] - let store = MeteredStore::new(&LRU_CACHING_METRICS, store); + let store = MeteredStore::new(&CACHED_SCYLLA_DB_METRICS, store); Self { store } } } diff --git a/linera-views/src/store.rs b/linera-views/src/store.rs index cc9257f63927..b6134cb6a51f 100644 --- a/linera-views/src/store.rs +++ b/linera-views/src/store.rs @@ -9,7 +9,12 @@ use serde::de::DeserializeOwned; #[cfg(with_testing)] use crate::random::generate_test_namespace; -use crate::{batch::Batch, common::from_bytes_option, views::ViewError}; +use crate::{ + batch::Batch, + common::from_bytes_option, + lru_caching::{StorageCachePolicy, DEFAULT_STORAGE_CACHE_POLICY}, + views::ViewError, +}; /// The common initialization parameters for the `KeyValueStore` #[derive(Debug, Clone)] @@ -18,8 +23,8 @@ pub struct CommonStoreConfig { pub max_concurrent_queries: Option, /// The number of streams used for the async streams. pub max_stream_queries: usize, - /// The cache size being used. - pub cache_size: usize, + /// The cache policy being used. + pub storage_cache_policy: StorageCachePolicy, } impl Default for CommonStoreConfig { @@ -27,7 +32,7 @@ impl Default for CommonStoreConfig { CommonStoreConfig { max_concurrent_queries: None, max_stream_queries: 10, - cache_size: 1000, + storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY, } } } diff --git a/linera-views/src/test_utils/mod.rs b/linera-views/src/test_utils/mod.rs index 6ca0de1de516..bb867ff64ce9 100644 --- a/linera-views/src/test_utils/mod.rs +++ b/linera-views/src/test_utils/mod.rs @@ -60,6 +60,32 @@ pub fn get_random_kset(rng: &mut R, n: usize, k: usize) -> Vec { values[..k].to_vec() } +/// Builds random keys with key doubled and random length, possible bytes are limited to critical values +pub fn get_random_key_values_doubled( + rng: &mut R, + key_prefix: Vec, + len_range: usize, + len_value: usize, + num_entries: usize, +) -> Vec<(Vec, Vec)> { + let mut key_values = BTreeMap::new(); + let key_poss = [0_u8, 10_u8, 255_u8]; + for _ in 0..num_entries { + let mut key = key_prefix.clone(); + let len = rng.gen_range(0..len_range); + for _ in 0..len { + let pos = rng.gen_range(0..key_poss.len()); + let val = key_poss[pos]; + for _ in 0..2 { + key.push(val); + } + } + let value = get_random_byte_vector(rng, &Vec::new(), len_value); + key_values.insert(key, value); + } + key_values.into_iter().collect::>() +} + /// Takes a random number generator, a key_prefix and generates /// pairs `(key, value)` with key obtained by appending 8 bytes at random to key_prefix /// and value obtained by appending 8 bytes to the trivial vector. @@ -72,17 +98,17 @@ pub fn get_random_key_values_prefix( num_entries: usize, ) -> Vec<(Vec, Vec)> { loop { - let mut v_ret = Vec::new(); - let mut vector_set = HashSet::new(); + let mut key_values = Vec::new(); + let mut keys = HashSet::new(); for _ in 0..num_entries { - let v1 = get_random_byte_vector(rng, &key_prefix, len_key); - let v2 = get_random_byte_vector(rng, &Vec::new(), len_value); - let v12 = (v1.clone(), v2); - vector_set.insert(v1); - v_ret.push(v12); + let key = get_random_byte_vector(rng, &key_prefix, len_key); + let value = get_random_byte_vector(rng, &Vec::new(), len_value); + let key_value = (key.clone(), value); + keys.insert(key); + key_values.push(key_value); } - if vector_set.len() == num_entries { - return v_ret; + if keys.len() == num_entries { + return key_values; } } } @@ -382,6 +408,23 @@ async fn read_key_values_prefix( key_values } +async fn read_key_values( + key_value_store: &C, + key_prefix: &[u8], +) -> BTreeMap, Vec> { + let mut key_values = BTreeMap::new(); + for key_value in key_value_store + .find_key_values_by_prefix(key_prefix) + .await + .unwrap() + .iterator() + { + let (key, value) = key_value.unwrap(); + key_values.insert(key.to_vec(), value.to_vec()); + } + key_values +} + /// Writes and then reads data under a prefix, and verifies the result. pub async fn run_test_batch_from_blank( key_value_store: &C, @@ -414,6 +457,135 @@ pub async fn run_writes_from_blank(key_value_st } } +#[allow(clippy::type_complexity)] +fn get_possible_key_values_results( + key_values: &BTreeMap, Vec>, +) -> Vec<(Vec, BTreeMap, Vec>)> { + let mut key_prefixes: BTreeSet> = BTreeSet::new(); + for key in key_values.keys() { + for i in 1..key.len() { + let key_prefix = &key[..i]; + key_prefixes.insert(key_prefix.to_vec()); + } + } + let mut possibilities = Vec::new(); + for key_prefix in key_prefixes { + let mut key_values_found: BTreeMap, Vec> = BTreeMap::new(); + for (key, value) in key_values { + if key.starts_with(&key_prefix) { + let key_red = &key[key_prefix.len()..]; + key_values_found.insert(key_red.to_vec(), value.clone()); + } + } + possibilities.push((key_prefix.to_vec(), key_values_found)); + } + possibilities +} + +/// The `update_entry` function in the LRU can have some potential errors. +pub async fn run_lru_related_test1(store: &S) { + let mut rng = make_deterministic_rng(); + let key_prefix = vec![0]; + let len_key = 4; + let len_value = 1; + let num_entries = 3; + let num_iter = 4; + for _ in 0..num_iter { + let key_values = get_random_key_values_prefix( + &mut rng, + key_prefix.clone(), + len_key, + len_value, + num_entries, + ); + let mut batch_initial = Batch::new(); + let mut map_state = BTreeMap::new(); + for (key, value) in key_values { + map_state.insert(key.clone(), value.clone()); + batch_initial.put_key_value_bytes(key, value); + } + store.write_batch(batch_initial).await.unwrap(); + for _ in 0..num_entries { + let inside = rng.gen::(); + let remove = rng.gen::(); + let key = if inside { + let len = map_state.len(); + let pos = rng.gen_range(0..len); + let iter = map_state.iter().nth(pos); + let (key, _) = iter.unwrap(); + key.to_vec() + } else { + get_small_key_space(&mut rng, &key_prefix, 4) + }; + let mut batch_atomic = Batch::new(); + if remove { + batch_atomic.delete_key(key); + } else { + let value = get_small_key_space(&mut rng, &key_prefix, 4); + batch_atomic.put_key_value_bytes(key, value); + } + update_state_from_batch(&mut map_state, &batch_atomic); + // That operation changes the keys and the LRU has to update it. + store.write_batch(batch_atomic).await.unwrap(); + // That operation loads the keys in the LRU cache + let key_values = read_key_values_prefix(store, &key_prefix).await; + assert_eq!(key_values, map_state); + } + let mut batch_clear = Batch::new(); + batch_clear.delete_key_prefix(key_prefix.clone()); + store.write_batch(batch_clear).await.unwrap(); + } +} + +/// The `start_pos / end_pos` indices can be tricky. +pub async fn run_lru_related_test2(store: &S) { + let mut rng = make_deterministic_rng(); + let key_prefix = vec![0]; + let len_range = 5; + let len_value = 1; + let num_entries = 10; + let num_iter = 4; + let num_removal = 5; + for _ in 0..num_iter { + let key_values = get_random_key_values_doubled( + &mut rng, + key_prefix.clone(), + len_range, + len_value, + num_entries, + ); + let mut batch_initial = Batch::new(); + let mut map_state = BTreeMap::new(); + for (key, value) in key_values { + map_state.insert(key.clone(), value.clone()); + batch_initial.put_key_value_bytes(key, value); + } + store.write_batch(batch_initial).await.unwrap(); + for _ in 0..num_removal { + let possibilities = get_possible_key_values_results(&map_state); + for (key_prefix, result) in &possibilities { + let key_values_read = read_key_values(store, key_prefix).await; + assert_eq!(&key_values_read, result); + } + let n_possibilities = possibilities.len(); + if n_possibilities > 0 { + let i_possibility = rng.gen_range(0..n_possibilities); + let key_prefix = possibilities[i_possibility].0.clone(); + let mut batch_atomic = Batch::new(); + batch_atomic.delete_key_prefix(key_prefix); + update_state_from_batch(&mut map_state, &batch_atomic); + // That operation changes the keys and the LRU has to update it. + store.write_batch(batch_atomic).await.unwrap(); + } + } + let possibilities = get_possible_key_values_results(&map_state); + for (key_prefix, result) in &possibilities { + let key_values_read = read_key_values(store, key_prefix).await; + assert_eq!(&key_values_read, result); + } + } +} + /// That test is especially challenging for ScyllaDB. /// In its default settings, Scylla has a limitation to 10000 tombstones. /// A tombstone is an indication that the data has been deleted. That diff --git a/linera-views/tests/store_tests.rs b/linera-views/tests/store_tests.rs index 727c03d9d170..17dc52811043 100644 --- a/linera-views/tests/store_tests.rs +++ b/linera-views/tests/store_tests.rs @@ -9,8 +9,8 @@ use linera_views::{ random::make_deterministic_rng, store::TestKeyValueStore as _, test_utils::{ - get_random_test_scenarios, run_big_write_read, run_reads, run_writes_from_blank, - run_writes_from_state, + get_random_test_scenarios, run_big_write_read, run_lru_related_test1, + run_lru_related_test2, run_reads, run_writes_from_blank, run_writes_from_state, }, value_splitting::create_value_splitting_memory_store, }; @@ -69,6 +69,36 @@ async fn test_reads_scylla_db() { } } +#[tokio::test] +async fn test_lru1_memory() { + let store = MemoryStore::new_test_store().await.unwrap(); + run_lru_related_test1(&store).await; +} + +#[cfg(with_scylladb)] +#[tokio::test] +async fn test_lru1_scylla_db() { + let store = linera_views::scylla_db::ScyllaDbStore::new_test_store() + .await + .unwrap(); + run_lru_related_test1(&store).await; +} + +#[tokio::test] +async fn test_lru2_memory() { + let store = MemoryStore::new_test_store().await.unwrap(); + run_lru_related_test2(&store).await; +} + +#[cfg(with_scylladb)] +#[tokio::test] +async fn test_lru2_scylla_db() { + let store = linera_views::scylla_db::ScyllaDbStore::new_test_store() + .await + .unwrap(); + run_lru_related_test2(&store).await; +} + #[cfg(with_indexeddb)] #[wasm_bindgen_test] async fn test_reads_indexed_db() { diff --git a/linera-views/tests/views_tests.rs b/linera-views/tests/views_tests.rs index 7165a2af6510..91dcb528f260 100644 --- a/linera-views/tests/views_tests.rs +++ b/linera-views/tests/views_tests.rs @@ -22,7 +22,7 @@ use linera_views::{ context::{create_test_memory_context, Context, MemoryContext, ViewContext}, key_value_store_view::{KeyValueStoreView, ViewContainer}, log_view::HashedLogView, - lru_caching::{LruCachingMemoryStore, LruCachingStore}, + lru_caching::{CachingMemoryStore, CachingStore, DEFAULT_STORAGE_CACHE_POLICY}, map_view::{ByteMapView, HashedMapView}, memory::MemoryStore, queue_view::HashedQueueView, @@ -120,20 +120,20 @@ impl StateStorage for KeyValueStoreTestStorage { } } -pub struct LruMemoryStorage { +pub struct CachedMemoryStorage { accessed_chains: BTreeSet, - store: LruCachingStore, + store: CachingStore, } #[async_trait] -impl StateStorage for LruMemoryStorage { - type Context = ViewContext; +impl StateStorage for CachedMemoryStorage { + type Context = ViewContext; async fn new() -> Self { let store = MemoryStore::new_test_store().await.unwrap(); - let cache_size = 1000; - let store = LruCachingStore::new(store, cache_size); - LruMemoryStorage { + let storage_cache_policy = DEFAULT_STORAGE_CACHE_POLICY; + let store = CachingStore::new(store, storage_cache_policy); + CachedMemoryStorage { accessed_chains: BTreeSet::new(), store, } @@ -653,18 +653,18 @@ async fn test_byte_map_view() -> Result<()> { } #[cfg(test)] -async fn test_views_in_lru_memory_param(config: &TestConfig) -> Result<()> { - tracing::warn!("Testing config {:?} with lru memory", config); - let mut store = LruMemoryStorage::new().await; +async fn test_views_in_cached_memory_param(config: &TestConfig) -> Result<()> { + tracing::warn!("Testing config {:?} with storage memory", config); + let mut store = CachedMemoryStorage::new().await; test_store(&mut store, config).await?; assert_eq!(store.accessed_chains.len(), 1); Ok(()) } #[tokio::test] -async fn test_views_in_lru_memory() -> Result<()> { +async fn test_views_in_cached_memory() -> Result<()> { for config in TestConfig::samples() { - test_views_in_lru_memory_param(&config).await?; + test_views_in_cached_memory_param(&config).await?; } Ok(()) }