Skip to content

Commit

Permalink
Extend the cache support.
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuDutSik committed Oct 23, 2024
1 parent 9e03718 commit 81c2b9e
Show file tree
Hide file tree
Showing 23 changed files with 1,161 additions and 294 deletions.
4 changes: 1 addition & 3 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp
* `--max-stream-queries <MAX_STREAM_QUERIES>` — The maximal number of simultaneous stream queries to the database

Default value: `10`
* `--cache-size <CACHE_SIZE>` — The maximal number of entries in the storage cache

Default value: `1000`
* `--storage-cache-policy <STORAGE_CACHE_POLICY>` — The storage cache policy
* `--retry-delay-ms <RETRY_DELAY>` — Delay increment for retrying to connect to a validator

Default value: `1000`
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use linera_core::client::BlanketMessagePolicy;
use linera_execution::{
committee::ValidatorName, ResourceControlPolicy, WasmRuntime, WithWasmDefault as _,
};
use linera_views::store::CommonStoreConfig;
use linera_views::{store::CommonStoreConfig, lru_caching::read_storage_cache_policy};

#[cfg(feature = "fs")]
use crate::config::GenesisConfig;
Expand Down Expand Up @@ -104,9 +104,9 @@ pub struct ClientOptions {
#[arg(long, default_value = "10")]
pub max_stream_queries: usize,

/// The maximal number of entries in the storage cache.
#[arg(long, default_value = "1000")]
pub cache_size: usize,
/// The storage cache policy
#[arg(long)]
pub storage_cache_policy: Option<String>,

/// Subcommand.
#[command(subcommand)]
Expand Down Expand Up @@ -167,10 +167,11 @@ impl ClientOptions {
}

fn common_config(&self) -> CommonStoreConfig {
let storage_cache_policy = read_storage_cache_policy(self.storage_cache_policy.clone());
CommonStoreConfig {
max_concurrent_queries: self.max_concurrent_queries,
max_stream_queries: self.max_stream_queries,
cache_size: self.cache_size,
storage_cache_policy,
}
}

Expand Down
11 changes: 7 additions & 4 deletions linera-indexer/lib/src/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::path::PathBuf;

use clap::Parser as _;
use linera_views::{
lru_caching::read_storage_cache_policy,
rocks_db::{PathWithGuard, RocksDbSpawnMode, RocksDbStore, RocksDbStoreConfig},
store::{AdminKeyValueStore, CommonStoreConfig},
};
Expand All @@ -28,20 +29,22 @@ pub struct RocksDbConfig {
/// The maximal number of simultaneous stream queries to the database
#[arg(long, default_value = "10")]
pub max_stream_queries: usize,
/// The maximal number of entries in the storage cache.
#[arg(long, default_value = "1000")]
cache_size: usize,
/// The file of the storage cache policy
#[arg(long)]
storage_cache_policy: Option<String>,
}

pub type RocksDbRunner = Runner<RocksDbStore, RocksDbConfig>;

impl RocksDbRunner {
pub async fn load() -> Result<Self, IndexerError> {
let config = IndexerConfig::<RocksDbConfig>::parse();
let storage_cache_policy =
read_storage_cache_policy(config.client.storage_cache_policy.clone());
let common_config = CommonStoreConfig {
max_concurrent_queries: config.client.max_concurrent_queries,
max_stream_queries: config.client.max_stream_queries,
cache_size: config.client.cache_size,
storage_cache_policy,
};
let path_buf = config.client.storage.as_path().to_path_buf();
let path_with_guard = PathWithGuard::new(path_buf);
Expand Down
11 changes: 7 additions & 4 deletions linera-indexer/lib/src/scylla_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use linera_views::{
lru_caching::read_storage_cache_policy,
scylla_db::{ScyllaDbStore, ScyllaDbStoreConfig},
store::{AdminKeyValueStore, CommonStoreConfig},
};
Expand All @@ -25,20 +26,22 @@ pub struct ScyllaDbConfig {
/// The maximal number of simultaneous stream queries to the database
#[arg(long, default_value = "10")]
pub max_stream_queries: usize,
/// The maximal number of entries in the storage cache.
#[arg(long, default_value = "1000")]
cache_size: usize,
/// The storage cache policy
#[arg(long)]
storage_cache_policy: Option<String>,
}

pub type ScyllaDbRunner = Runner<ScyllaDbStore, ScyllaDbConfig>;

impl ScyllaDbRunner {
pub async fn load() -> Result<Self, IndexerError> {
let config = <IndexerConfig<ScyllaDbConfig> as clap::Parser>::parse();
let storage_cache_policy =
read_storage_cache_policy(config.client.storage_cache_policy.clone());
let common_config = CommonStoreConfig {
max_concurrent_queries: config.client.max_concurrent_queries,
max_stream_queries: config.client.max_stream_queries,
cache_size: config.client.cache_size,
storage_cache_policy,
};
let namespace = config.client.table.clone();
let root_key = &[];
Expand Down
11 changes: 6 additions & 5 deletions linera-service/src/proxy/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use linera_rpc::{
use linera_service::prometheus_server;
use linera_service::util;
use linera_storage::Storage;
use linera_views::store::CommonStoreConfig;
use linera_views::{store::CommonStoreConfig, lru_caching::read_storage_cache_policy};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument};
Expand Down Expand Up @@ -68,9 +68,9 @@ pub struct ProxyOptions {
#[arg(long, default_value = "10")]
max_stream_queries: usize,

/// The maximal number of entries in the storage cache.
#[arg(long, default_value = "1000")]
cache_size: usize,
/// The storage cache policy
#[arg(long)]
storage_cache_policy: Option<String>,

/// Path to the file describing the initial user chains (aka genesis state)
#[arg(long = "genesis")]
Expand Down Expand Up @@ -366,10 +366,11 @@ fn main() -> Result<()> {

impl ProxyOptions {
async fn run(&self) -> Result<()> {
let storage_cache_policy = read_storage_cache_policy(self.storage_cache_policy.clone());
let common_config = CommonStoreConfig {
max_concurrent_queries: self.max_concurrent_queries,
max_stream_queries: self.max_stream_queries,
cache_size: self.cache_size,
storage_cache_policy,
};
let full_storage_config = self.storage_config.add_common_config(common_config).await?;
let genesis_config: GenesisConfig = util::read_json(&self.genesis_config_path)?;
Expand Down
24 changes: 13 additions & 11 deletions linera-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use linera_rpc::{
use linera_service::prometheus_server;
use linera_service::util;
use linera_storage::Storage;
use linera_views::store::CommonStoreConfig;
use linera_views::{store::CommonStoreConfig, lru_caching::read_storage_cache_policy};
use serde::Deserialize;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -350,9 +350,9 @@ enum ServerCommand {
#[arg(long, default_value = "10")]
max_stream_queries: usize,

/// The maximal number of entries in the storage cache.
#[arg(long, default_value = "1000")]
cache_size: usize,
/// The storage cache policy
#[arg(long)]
storage_cache_policy: Option<String>,
},

/// Act as a trusted third-party and generate all server configurations
Expand Down Expand Up @@ -391,9 +391,9 @@ enum ServerCommand {
#[arg(long, default_value = "10")]
max_stream_queries: usize,

/// The maximal number of entries in the storage cache.
#[arg(long, default_value = "1000")]
cache_size: usize,
/// The storage cache policy
#[arg(long)]
storage_cache_policy: Option<String>,
},
}

Expand Down Expand Up @@ -459,7 +459,7 @@ async fn run(options: ServerOptions) {
wasm_runtime,
max_concurrent_queries,
max_stream_queries,
cache_size,
storage_cache_policy,
} => {
let genesis_config: GenesisConfig =
util::read_json(&genesis_config_path).expect("Fail to read initial chain config");
Expand All @@ -481,10 +481,11 @@ async fn run(options: ServerOptions) {
grace_period,
};
let wasm_runtime = wasm_runtime.with_wasm_default();
let storage_cache_policy = read_storage_cache_policy(storage_cache_policy);
let common_config = CommonStoreConfig {
max_concurrent_queries,
max_stream_queries,
cache_size,
storage_cache_policy,
};
let full_storage_config = storage_config
.add_common_config(common_config)
Expand Down Expand Up @@ -541,14 +542,15 @@ async fn run(options: ServerOptions) {
genesis_config_path,
max_concurrent_queries,
max_stream_queries,
cache_size,
storage_cache_policy,
} => {
let genesis_config: GenesisConfig =
util::read_json(&genesis_config_path).expect("Fail to read initial chain config");
let storage_cache_policy = read_storage_cache_policy(storage_cache_policy);
let common_config = CommonStoreConfig {
max_concurrent_queries,
max_stream_queries,
cache_size,
storage_cache_policy,
};
let full_storage_config = storage_config
.add_common_config(common_config)
Expand Down
35 changes: 19 additions & 16 deletions linera-storage-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use linera_base::ensure;
use linera_views::metering::MeteredStore;
use linera_views::{
batch::{Batch, WriteOperation},
lru_caching::LruCachingStore,
lru_caching::{CachingStore, StorageCachePolicy, DEFAULT_STORAGE_CACHE_POLICY},
store::{
AdminKeyValueStore, CommonStoreConfig, ReadableKeyValueStore, WithError,
WritableKeyValueStore,
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct ServiceStoreClientInternal {
channel: Channel,
semaphore: Option<Arc<Semaphore>>,
max_stream_queries: usize,
cache_size: usize,
storage_cache_policy: StorageCachePolicy,
namespace: Vec<u8>,
root_key: Vec<u8>,
}
Expand Down Expand Up @@ -400,14 +400,14 @@ impl AdminKeyValueStore for ServiceStoreClientInternal {
.max_concurrent_queries
.map(|n| Arc::new(Semaphore::new(n)));
let max_stream_queries = config.common_config.max_stream_queries;
let cache_size = config.common_config.cache_size;
let storage_cache_policy = config.common_config.storage_cache_policy.clone();
let namespace = Self::namespace_as_vec(namespace)?;
let root_key = root_key.to_vec();
Ok(Self {
channel,
semaphore,
max_stream_queries,
cache_size,
storage_cache_policy,
namespace,
root_key,
})
Expand All @@ -417,14 +417,14 @@ impl AdminKeyValueStore for ServiceStoreClientInternal {
let channel = self.channel.clone();
let semaphore = self.semaphore.clone();
let max_stream_queries = self.max_stream_queries;
let cache_size = self.cache_size;
let storage_cache_policy = self.storage_cache_policy.clone();
let namespace = self.namespace.clone();
let root_key = root_key.to_vec();
Ok(Self {
channel,
semaphore,
max_stream_queries,
cache_size,
storage_cache_policy,
namespace,
root_key,
})
Expand Down Expand Up @@ -503,11 +503,11 @@ impl TestKeyValueStore for ServiceStoreClientInternal {
/// Creates the `CommonStoreConfig` for the `ServiceStoreClientInternal`.
pub fn create_service_store_common_config() -> CommonStoreConfig {
let max_stream_queries = 100;
let cache_size = 10; // unused
let storage_cache_policy = DEFAULT_STORAGE_CACHE_POLICY;
CommonStoreConfig {
max_concurrent_queries: None,
max_stream_queries,
cache_size,
storage_cache_policy,
}
}

Expand Down Expand Up @@ -552,9 +552,9 @@ pub async fn create_service_test_store() -> Result<ServiceStoreClientInternal, S
#[derive(Clone)]
pub struct ServiceStoreClient {
#[cfg(with_metrics)]
store: MeteredStore<LruCachingStore<MeteredStore<ServiceStoreClientInternal>>>,
store: MeteredStore<CachingStore<MeteredStore<ServiceStoreClientInternal>>>,
#[cfg(not(with_metrics))]
store: LruCachingStore<ServiceStoreClientInternal>,
store: CachingStore<ServiceStoreClientInternal>,
}

impl WithError for ServiceStoreClient {
Expand Down Expand Up @@ -624,15 +624,15 @@ impl AdminKeyValueStore for ServiceStoreClient {
namespace: &str,
root_key: &[u8],
) -> Result<Self, ServiceStoreError> {
let cache_size = config.common_config.cache_size;
let storage_cache_policy = config.common_config.storage_cache_policy.clone();
let store = ServiceStoreClientInternal::connect(config, namespace, root_key).await?;
Ok(ServiceStoreClient::from_inner(store, cache_size))
Ok(ServiceStoreClient::from_inner(store, storage_cache_policy))
}

fn clone_with_root_key(&self, root_key: &[u8]) -> Result<Self, ServiceStoreError> {
let store = self.inner().clone_with_root_key(root_key)?;
let cache_size = self.inner().cache_size;
Ok(ServiceStoreClient::from_inner(store, cache_size))
let storage_cache_policy = self.inner().storage_cache_policy.clone();
Ok(ServiceStoreClient::from_inner(store, storage_cache_policy))
}

async fn list_all(config: &Self::Config) -> Result<Vec<String>, ServiceStoreError> {
Expand Down Expand Up @@ -674,10 +674,13 @@ impl ServiceStoreClient {
&self.store.store
}

fn from_inner(store: ServiceStoreClientInternal, cache_size: usize) -> ServiceStoreClient {
fn from_inner(
store: ServiceStoreClientInternal,
storage_cache_policy: StorageCachePolicy,
) -> ServiceStoreClient {
#[cfg(with_metrics)]
let store = MeteredStore::new(&STORAGE_SERVICE_METRICS, store);
let store = LruCachingStore::new(store, cache_size);
let store = CachingStore::new(store, storage_cache_policy);
#[cfg(with_metrics)]
let store = MeteredStore::new(&LRU_STORAGE_SERVICE_METRICS, store);
Self { store }
Expand Down
4 changes: 3 additions & 1 deletion linera-storage-service/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::path::PathBuf;
use std::sync::LazyLock;

use linera_base::command::resolve_binary;
#[cfg(with_testing)]
use linera_views::lru_caching::DEFAULT_STORAGE_CACHE_POLICY;
#[cfg(with_metrics)]
use linera_views::metering::KeyValueStoreMetrics;
use linera_views::{
Expand Down Expand Up @@ -83,7 +85,7 @@ pub fn create_shared_store_common_config() -> CommonStoreConfig {
CommonStoreConfig {
max_concurrent_queries: Some(TEST_SHARED_STORE_MAX_CONCURRENT_QUERIES),
max_stream_queries: TEST_SHARED_STORE_MAX_STREAM_QUERIES,
cache_size: usize::MAX,
storage_cache_policy: DEFAULT_STORAGE_CACHE_POLICY,
}
}

Expand Down
1 change: 1 addition & 0 deletions linera-views/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ rand = { workspace = true, features = ["small_rng"] }
rocksdb = { workspace = true, optional = true }
scylla = { workspace = true, optional = true }
serde.workspace = true
serde_json.workspace = true
sha3.workspace = true
static_assertions.workspace = true
tempfile.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion linera-views/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ We provide an implementation of the trait `KeyValueStore` for the following key-
The trait `KeyValueStore` was designed so that more storage solutions can be easily added in the future.

The `KeyValueStore` trait is also implemented for several internal constructions of clients:
* The `LruCachingStore<K>` client implements the Least Recently Used (LRU)
* The `CachingStore<K>` client implements the Least Recently Used (LRU)
caching of reads into the client.
* The `ViewContainer<C>` client implements a key-value store client from a context.
* The `ValueSplittingStore<K>` implements a client for which the
Expand Down
Loading

0 comments on commit 81c2b9e

Please sign in to comment.