diff --git a/crates/bifrost/src/providers/replicated_loglet/provider.rs b/crates/bifrost/src/providers/replicated_loglet/provider.rs index caaf42b3cb..51f45aed92 100644 --- a/crates/bifrost/src/providers/replicated_loglet/provider.rs +++ b/crates/bifrost/src/providers/replicated_loglet/provider.rs @@ -121,8 +121,13 @@ impl ReplicatedLogletProvider { active_loglets: Default::default(), _metadata_store_client: metadata_store_client, networking, - // todo(asoli): read memory budget from ReplicatedLogletOptions - record_cache: RecordCache::new(20_000_000), // 20MB + record_cache: RecordCache::new( + Configuration::pinned() + .bifrost + .replicated_loglet + .record_cache_memory_size + .as_usize(), + ), logserver_rpc_routers, sequencer_rpc_routers, } diff --git a/crates/bifrost/src/providers/replicated_loglet/record_cache.rs b/crates/bifrost/src/providers/replicated_loglet/record_cache.rs index 9f5bfaa0e2..b6fcc418e7 100644 --- a/crates/bifrost/src/providers/replicated_loglet/record_cache.rs +++ b/crates/bifrost/src/providers/replicated_loglet/record_cache.rs @@ -27,27 +27,39 @@ type RecordKey = (ReplicatedLogletId, LogletOffset); /// RemoteSequencers #[derive(Clone)] pub struct RecordCache { - inner: Cache, + inner: Option>, } impl RecordCache { + /// Creates a new instance of RecordCache. If memory budget is None + /// cache will be disabled pub fn new(memory_budget_bytes: usize) -> Self { - let inner: Cache = CacheBuilder::default() - .weigher(|_, record: &Record| { - (size_of::() + record.estimated_encode_size()) - .try_into() - .unwrap_or(u32::MAX) - }) - .max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX)) - .eviction_policy(EvictionPolicy::lru()) - .build(); + let inner = if memory_budget_bytes > 0 { + Some( + CacheBuilder::default() + .weigher(|_, record: &Record| { + (size_of::() + record.estimated_encode_size()) + .try_into() + .unwrap_or(u32::MAX) + }) + .max_capacity(memory_budget_bytes.try_into().unwrap_or(u64::MAX)) + .eviction_policy(EvictionPolicy::lru()) + .build(), + ) + } else { + None + }; Self { inner } } /// Writes a record to cache externally pub fn add(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset, record: Record) { - self.inner.insert((loglet_id, offset), record); + let Some(ref inner) = self.inner else { + return; + }; + + inner.insert((loglet_id, offset), record); } /// Extend cache with records @@ -57,14 +69,20 @@ impl RecordCache { mut first_offset: LogletOffset, records: I, ) { + let Some(ref inner) = self.inner else { + return; + }; + for record in records.as_ref() { - self.inner.insert((loglet_id, first_offset), record.clone()); + inner.insert((loglet_id, first_offset), record.clone()); first_offset = first_offset.next(); } } /// Get a for given loglet id and offset. pub fn get(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Option { - self.inner.get(&(loglet_id, offset)) + let inner = self.inner.as_ref()?; + + inner.get(&(loglet_id, offset)) } } diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index 2e54dd9229..2cf052dfef 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -15,7 +15,7 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use restate_serde_util::NonZeroByteCount; +use restate_serde_util::{ByteCount, NonZeroByteCount}; use tracing::warn; use crate::logs::metadata::ProviderKind; @@ -237,6 +237,14 @@ pub struct ReplicatedLogletOptions { /// /// Retry policy for log server RPCs pub log_server_retry_policy: RetryPolicy, + + /// # In-memory RecordCache memory limit + /// + /// Optional size of record cache in bytes. + /// If set to 0, record cache will be disabled. + /// Defaults: 20M + #[cfg_attr(feature = "schemars", schemars(with = "ByteCount"))] + pub record_cache_memory_size: ByteCount, } impl Default for ReplicatedLogletOptions { @@ -257,6 +265,7 @@ impl Default for ReplicatedLogletOptions { Some(10), Some(Duration::from_millis(2000)), ), + record_cache_memory_size: 20_000_000u64.into(), // 20MB } } }