From 776efa3b65cf92f2524fd349d5d720fe69807f07 Mon Sep 17 00:00:00 2001 From: Aleksandr Logunov Date: Fri, 26 Aug 2022 19:54:16 +0400 Subject: [PATCH] feat: improved shard cache (#7429) Co-authored-by: firatNEAR Co-authored-by: firatNEAR <102993450+firatNEAR@users.noreply.github.com> --- CHANGELOG.md | 3 + chain/chain/src/store.rs | 8 + core/store/src/config.rs | 4 +- core/store/src/lib.rs | 19 +- core/store/src/metrics.rs | 134 +++++++- core/store/src/trie/shard_tries.rs | 44 ++- core/store/src/trie/trie_storage.rs | 324 +++++++++++++++++- core/store/src/trie/trie_tests.rs | 23 +- .../src/estimator_context.rs | 8 +- 9 files changed, 523 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a2cfa1f76e2..9103498786c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,9 @@ * `network.external_address` field in `config.json` file is deprecated. In fact it has never been used and only served to confuse everyone [#7300](https://github.com/near/nearcore/pull/7300) +* Due to increasing state size, improved shard cache for Trie nodes to + put more nodes in memory. Requires 3 GB more RAM + [#7429](https://github.com/near/nearcore/pull/7429) ## 1.28.0 [2022-07-27] diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index dc03847a0d6..09fbde1c823 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -2775,8 +2775,14 @@ impl<'a> ChainStoreUpdate<'a> { block_hash, )?; } + + // Convert trie changes to database ops for trie nodes. + // Create separate store update for deletions, because we want to update cache and don't want to remove nodes + // from the store. + let mut deletions_store_update = self.store().store_update(); for mut wrapped_trie_changes in self.trie_changes.drain(..) { wrapped_trie_changes.insertions_into(&mut store_update); + wrapped_trie_changes.deletions_into(&mut deletions_store_update); wrapped_trie_changes.state_changes_into(&mut store_update); if self.chain_store.save_trie_changes { @@ -2785,6 +2791,8 @@ impl<'a> ChainStoreUpdate<'a> { .map_err(|err| Error::Other(err.to_string()))?; } } + deletions_store_update.update_cache()?; + for ((block_hash, shard_id), state_changes) in self.add_state_changes_for_split_states.drain() { diff --git a/core/store/src/config.rs b/core/store/src/config.rs index 2551534612f..25b96f6c4c3 100644 --- a/core/store/src/config.rs +++ b/core/store/src/config.rs @@ -41,7 +41,7 @@ pub struct StoreConfig { pub block_size: bytesize::ByteSize, /// Trie cache capacities - /// Default value: ShardUId {version: 1, shard_id: 3} -> 2_000_000. TODO: clarify + /// Default value: ShardUId {version: 1, shard_id: 3} -> 45_000_000 /// We're still experimenting with this parameter and it seems decreasing its value can improve /// the performance of the storage pub trie_cache_capacities: Vec<(ShardUId, usize)>, @@ -102,7 +102,7 @@ impl Default for StoreConfig { // we use it since then. block_size: bytesize::ByteSize::kib(16), - trie_cache_capacities: Default::default(), + trie_cache_capacities: vec![(ShardUId { version: 1, shard_id: 3 }, 45_000_000)], } } } diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 262062ba18d..eafbd2b2011 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -356,6 +356,17 @@ impl StoreUpdate { self.transaction.merge(other.transaction) } + pub fn update_cache(&self) -> io::Result<()> { + if let Some(tries) = &self.shard_tries { + // Note: avoid comparing wide pointers here to work-around + // https://github.com/rust-lang/rust/issues/69757 + let addr = |arc| Arc::as_ptr(arc) as *const u8; + assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),); + tries.update_cache(&self.transaction)?; + } + Ok(()) + } + pub fn commit(self) -> io::Result<()> { debug_assert!( { @@ -376,13 +387,7 @@ impl StoreUpdate { "Transaction overwrites itself: {:?}", self ); - if let Some(tries) = self.shard_tries { - // Note: avoid comparing wide pointers here to work-around - // https://github.com/rust-lang/rust/issues/69757 - let addr = |arc| Arc::as_ptr(arc) as *const u8; - assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),); - tries.update_cache(&self.transaction)?; - } + self.update_cache()?; let _span = tracing::trace_span!(target: "store", "commit").entered(); for op in &self.transaction.ops { match op { diff --git a/core/store/src/metrics.rs b/core/store/src/metrics.rs index e677530af21..3477ebac715 100644 --- a/core/store/src/metrics.rs +++ b/core/store/src/metrics.rs @@ -1,4 +1,7 @@ -use near_metrics::{try_create_histogram_vec, HistogramVec}; +use near_metrics::{ + try_create_histogram_vec, try_create_int_counter_vec, try_create_int_gauge_vec, HistogramVec, + IntCounterVec, IntGaugeVec, +}; use once_cell::sync::Lazy; pub(crate) static DATABASE_OP_LATENCY_HIST: Lazy = Lazy::new(|| { @@ -10,3 +13,132 @@ pub(crate) static DATABASE_OP_LATENCY_HIST: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static CHUNK_CACHE_HITS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_chunk_cache_hits", + "Chunk cache hits", + &["shard_id", "is_view"], + ) + .unwrap() +}); + +pub static CHUNK_CACHE_MISSES: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_chunk_cache_misses", + "Chunk cache misses", + &["shard_id", "is_view"], + ) + .unwrap() +}); + +pub static SHARD_CACHE_HITS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_hits", + "Shard cache hits", + &["shard_id", "is_view"], + ) + .unwrap() +}); + +pub static SHARD_CACHE_MISSES: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_misses", + "Shard cache misses", + &["shard_id", "is_view"], + ) + .unwrap() +}); + +pub static SHARD_CACHE_TOO_LARGE: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_too_large", + "Number of values to be inserted into shard cache is too large", + &["shard_id", "is_view"], + ) + .unwrap() +}); + +pub static SHARD_CACHE_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec("near_shard_cache_size", "Shard cache size", &["shard_id", "is_view"]) + .unwrap() +}); + +pub static CHUNK_CACHE_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec("near_chunk_cache_size", "Chunk cache size", &["shard_id", "is_view"]) + .unwrap() +}); + +pub static SHARD_CACHE_CURRENT_TOTAL_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_shard_cache_current_total_size", + "Shard cache current total size", + &["shard_id", "is_view"], + ) + .unwrap() +}); + +pub static SHARD_CACHE_POP_HITS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_pop_hits", + "Shard cache pop hits", + &["shard_id", "is_view"], + ) + .unwrap() +}); +pub static SHARD_CACHE_POP_MISSES: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_pop_misses", + "Shard cache pop misses", + &["shard_id", "is_view"], + ) + .unwrap() +}); +pub static SHARD_CACHE_POP_LRU: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_pop_lru", + "Shard cache LRU pops", + &["shard_id", "is_view"], + ) + .unwrap() +}); +pub static SHARD_CACHE_GC_POP_MISSES: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_shard_cache_gc_pop_misses", + "Shard cache gc pop misses", + &["shard_id", "is_view"], + ) + .unwrap() +}); +pub static SHARD_CACHE_DELETIONS_SIZE: Lazy = Lazy::new(|| { + try_create_int_gauge_vec( + "near_shard_cache_deletions_size", + "Shard cache deletions size", + &["shard_id", "is_view"], + ) + .unwrap() +}); +pub static APPLIED_TRIE_DELETIONS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_applied_trie_deletions", + "Trie deletions applied to store", + &["shard_id"], + ) + .unwrap() +}); +pub static APPLIED_TRIE_INSERTIONS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_applied_trie_insertions", + "Trie insertions applied to store", + &["shard_id"], + ) + .unwrap() +}); +pub static REVERTED_TRIE_INSERTIONS: Lazy = Lazy::new(|| { + try_create_int_counter_vec( + "near_reverted_trie_insertions", + "Trie insertions reverted due to GC of forks", + &["shard_id"], + ) + .unwrap() +}); diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 25ec14beeef..6d16685d1e4 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -21,7 +21,7 @@ use near_primitives::state_record::is_delayed_receipt_key; use crate::flat_state::FlatState; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR}; -use crate::{DBCol, DBOp, DBTransaction}; +use crate::{metrics, DBCol, DBOp, DBTransaction}; use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate}; /// Responsible for creation of trie caches, stores necessary configuration for it. @@ -42,20 +42,24 @@ impl TrieCacheFactory { } /// Create new cache for the given shard uid. - pub fn create_cache(&self, shard_uid: &ShardUId) -> TrieCache { - match self.capacities.get(shard_uid) { - Some(capacity) => TrieCache::with_capacity(*capacity), - None => TrieCache::new(), + pub fn create_cache(&self, shard_uid: &ShardUId, is_view: bool) -> TrieCache { + let capacity = if is_view { None } else { self.capacities.get(shard_uid) }; + match capacity { + Some(capacity) => TrieCache::with_capacities(*capacity, shard_uid.shard_id, is_view), + None => TrieCache::new(shard_uid.shard_id, is_view), } } /// Create caches on the initialization of storage structures. - pub fn create_initial_caches(&self) -> HashMap { + pub fn create_initial_caches(&self, is_view: bool) -> HashMap { assert_ne!(self.num_shards, 0); let shards: Vec<_> = (0..self.num_shards) .map(|shard_id| ShardUId { version: self.shard_version, shard_id: shard_id as u32 }) .collect(); - shards.iter().map(|&shard_uid| (shard_uid, self.create_cache(&shard_uid))).collect() + shards + .iter() + .map(|&shard_uid| (shard_uid, self.create_cache(&shard_uid, is_view))) + .collect() } } @@ -73,8 +77,8 @@ pub struct ShardTries(Arc); impl ShardTries { pub fn new(store: Store, trie_cache_factory: TrieCacheFactory) -> Self { - let caches = trie_cache_factory.create_initial_caches(); - let view_caches = trie_cache_factory.create_initial_caches(); + let caches = trie_cache_factory.create_initial_caches(false); + let view_caches = trie_cache_factory.create_initial_caches(true); ShardTries(Arc::new(ShardTriesInner { store, trie_cache_factory, @@ -112,10 +116,11 @@ impl ShardTries { let mut caches = caches_to_use.write().expect(POISONED_LOCK_ERR); caches .entry(shard_uid) - .or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid)) + .or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid, is_view)) .clone() }; - let storage = Box::new(TrieCachingStorage::new(self.0.store.clone(), cache, shard_uid)); + let storage = + Box::new(TrieCachingStorage::new(self.0.store.clone(), cache, shard_uid, is_view)); let flat_state = { #[cfg(feature = "protocol_feature_flat_state")] if use_flat_state { @@ -177,7 +182,7 @@ impl ShardTries { for (shard_uid, ops) in shards { let cache = caches .entry(shard_uid) - .or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid)) + .or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid, false)) .clone(); cache.update_cache(ops); } @@ -238,6 +243,9 @@ impl ShardTries { shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { + metrics::APPLIED_TRIE_INSERTIONS + .with_label_values(&[&format!("{}", shard_uid.shard_id)]) + .inc_by(trie_changes.insertions.len() as u64); self.apply_insertions_inner(&trie_changes.insertions, shard_uid, store_update) } @@ -247,6 +255,9 @@ impl ShardTries { shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { + metrics::APPLIED_TRIE_DELETIONS + .with_label_values(&[&format!("{}", shard_uid.shard_id)]) + .inc_by(trie_changes.deletions.len() as u64); self.apply_deletions_inner(&trie_changes.deletions, shard_uid, store_update) } @@ -256,6 +267,9 @@ impl ShardTries { shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { + metrics::REVERTED_TRIE_INSERTIONS + .with_label_values(&[&format!("{}", shard_uid.shard_id)]) + .inc_by(trie_changes.insertions.len() as u64); self.apply_deletions_inner(&trie_changes.insertions, shard_uid, store_update) } @@ -321,10 +335,16 @@ impl WrappedTrieChanges { &self.state_changes } + /// Save insertions of trie nodes into Store. pub fn insertions_into(&self, store_update: &mut StoreUpdate) { self.tries.apply_insertions(&self.trie_changes, self.shard_uid, store_update) } + /// Save deletions of trie nodes into Store. + pub fn deletions_into(&self, store_update: &mut StoreUpdate) { + self.tries.apply_deletions(&self.trie_changes, self.shard_uid, store_update) + } + /// Save state changes into Store. /// /// NOTE: the changes are drained from `self`. diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index 04069072cbc..f961cbff5e8 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -1,33 +1,207 @@ use std::borrow::Borrow; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::{Arc, Mutex}; use near_primitives::hash::CryptoHash; use crate::db::refcount::decode_value_with_rc; use crate::trie::POISONED_LOCK_ERR; -use crate::{DBCol, StorageError, Store}; +use crate::{metrics, DBCol, StorageError, Store}; use lru::LruCache; +use near_o11y::log_assert; use near_primitives::shard_layout::ShardUId; use near_primitives::types::{TrieCacheMode, TrieNodesCount}; use std::cell::{Cell, RefCell}; use std::io::ErrorKind; -/// Wrapper over LruCache which doesn't hold too large elements. +pub(crate) struct BoundedQueue { + queue: VecDeque, + /// If queue size exceeds capacity, item from the tail is removed. + capacity: usize, +} + +impl BoundedQueue { + pub(crate) fn new(capacity: usize) -> Self { + // Reserve space for one extra element to simplify `put`. + Self { queue: VecDeque::with_capacity(capacity + 1), capacity } + } + + pub(crate) fn clear(&mut self) { + self.queue.clear(); + } + + pub(crate) fn pop(&mut self) -> Option { + self.queue.pop_front() + } + + pub(crate) fn put(&mut self, key: T) -> Option { + self.queue.push_back(key); + if self.queue.len() > self.capacity { + Some(self.pop().expect("Queue cannot be empty")) + } else { + None + } + } + + pub(crate) fn len(&self) -> usize { + self.queue.len() + } +} + +/// In-memory cache for trie items - nodes and values. All nodes are stored in the LRU cache with three modifications. +/// 1) Size of each value must not exceed `TRIE_LIMIT_CACHED_VALUE_SIZE`. +/// Needed to avoid caching large values like contract codes. +/// 2) If we put new value to LRU cache and total size of existing values exceeds `total_sizes_capacity`, we evict +/// values from it until that is no longer the case. So the actual total size should never exceed +/// `total_size_limit` + `TRIE_LIMIT_CACHED_VALUE_SIZE`. +/// Needed because value sizes generally vary from 1 B to 500 B and we want to count cache size precisely. +/// 3) If value is popped, it is put to the `deletions` queue with `deletions_queue_capacity` first. If popped value +/// doesn't fit in the queue, the last value is removed from the queue and LRU cache, and newly popped value is inserted +/// to the queue. +/// Needed to delay deletions when we have forks. In such case, many blocks can share same parent, and we want to keep +/// old nodes in cache for a while to process all new roots. For example, it helps to read old state root. +pub struct TrieCacheInner { + /// LRU cache keeping mapping from keys to values. + cache: LruCache>, + /// Queue of items which were popped, which postpones deletion of old nodes. + deletions: BoundedQueue, + /// Current total size of all values in the cache. + total_size: u64, + /// Upper bound for the total size. + total_size_limit: u64, + /// Shard id of the nodes being cached. + shard_id: u32, + /// Whether cache is used for view calls execution. + is_view: bool, +} + +impl TrieCacheInner { + pub(crate) fn new( + cache_capacity: usize, + deletions_queue_capacity: usize, + total_size_limit: u64, + shard_id: u32, + is_view: bool, + ) -> Self { + assert!(cache_capacity > 0 && total_size_limit > 0); + Self { + cache: LruCache::new(cache_capacity), + deletions: BoundedQueue::new(deletions_queue_capacity), + total_size: 0, + total_size_limit, + shard_id, + is_view, + } + } + + pub(crate) fn get(&mut self, key: &CryptoHash) -> Option> { + self.cache.get(key).cloned() + } + + pub(crate) fn clear(&mut self) { + self.total_size = 0; + self.deletions.clear(); + self.cache.clear(); + } + + pub(crate) fn put(&mut self, key: CryptoHash, value: Arc<[u8]>) { + let metrics_labels: [&str; 2] = + [&format!("{}", self.shard_id), &format!("{}", self.is_view as u8)]; + while self.total_size > self.total_size_limit || self.cache.len() == self.cache.cap() { + // First, try to evict value using the key from deletions queue. + match self.deletions.pop() { + Some(key) => match self.cache.pop(&key) { + Some(value) => { + metrics::SHARD_CACHE_POP_HITS.with_label_values(&metrics_labels).inc(); + self.total_size -= value.len() as u64; + continue; + } + None => { + metrics::SHARD_CACHE_POP_MISSES.with_label_values(&metrics_labels).inc(); + } + }, + None => {} + } + + // Second, pop LRU value. + metrics::SHARD_CACHE_POP_LRU.with_label_values(&metrics_labels).inc(); + let (_, value) = + self.cache.pop_lru().expect("Cannot fail because total size capacity is > 0"); + self.total_size -= value.len() as u64; + } + + // Add value to the cache. + self.total_size += value.len() as u64; + match self.cache.push(key, value) { + Some((evicted_key, evicted_value)) => { + log_assert!(key == evicted_key, "LRU cache with shard_id = {}, is_view = {} can't be full before inserting key {}", self.shard_id, self.is_view, key); + self.total_size -= evicted_value.len() as u64; + } + None => {} + }; + } + + // Adds key to the deletions queue if it is present in cache. + // Returns key-value pair which are popped if deletions queue is full. + pub(crate) fn pop(&mut self, key: &CryptoHash) -> Option<(CryptoHash, Arc<[u8]>)> { + let metrics_labels: [&str; 2] = + [&format!("{}", self.shard_id), &format!("{}", self.is_view as u8)]; + metrics::SHARD_CACHE_DELETIONS_SIZE + .with_label_values(&metrics_labels) + .set(self.deletions.len() as i64); + // Do nothing if key was removed before. + if self.cache.contains(key) { + // Put key to the queue of deletions and possibly remove another key we have to delete. + match self.deletions.put(key.clone()) { + Some(key_to_delete) => match self.cache.pop(&key_to_delete) { + Some(evicted_value) => { + metrics::SHARD_CACHE_POP_HITS.with_label_values(&metrics_labels).inc(); + self.total_size -= evicted_value.len() as u64; + Some((key_to_delete, evicted_value)) + } + None => { + metrics::SHARD_CACHE_POP_MISSES.with_label_values(&metrics_labels).inc(); + None + } + }, + None => None, + } + } else { + metrics::SHARD_CACHE_GC_POP_MISSES.with_label_values(&metrics_labels).inc(); + None + } + } + + pub fn len(&self) -> usize { + self.cache.len() + } + + pub fn current_total_size(&self) -> u64 { + self.total_size + } +} + +/// Wrapper over LruCache to handle concurrent access. #[derive(Clone)] -pub struct TrieCache(Arc>>>); +pub struct TrieCache(Arc>); impl TrieCache { - pub fn new() -> Self { - Self::with_capacity(TRIE_DEFAULT_SHARD_CACHE_SIZE) + pub fn new(shard_id: u32, is_view: bool) -> Self { + Self::with_capacities(TRIE_DEFAULT_SHARD_CACHE_SIZE, shard_id, is_view) } - pub fn with_capacity(cap: usize) -> Self { - Self(Arc::new(Mutex::new(LruCache::new(cap)))) + pub fn with_capacities(cap: usize, shard_id: u32, is_view: bool) -> Self { + Self(Arc::new(Mutex::new(TrieCacheInner::new( + cap, + DEFAULT_SHARD_CACHE_DELETIONS_QUEUE_CAPACITY, + DEFAULT_SHARD_CACHE_TOTAL_SIZE_LIMIT, + shard_id, + is_view, + )))) } pub fn get(&self, key: &CryptoHash) -> Option> { - self.0.lock().expect(POISONED_LOCK_ERR).get(key).cloned() + self.0.lock().expect(POISONED_LOCK_ERR).get(key) } pub fn clear(&self) { @@ -36,11 +210,15 @@ impl TrieCache { pub fn update_cache(&self, ops: Vec<(CryptoHash, Option<&Vec>)>) { let mut guard = self.0.lock().expect(POISONED_LOCK_ERR); + let metrics_labels: [&str; 2] = + [&format!("{}", guard.shard_id), &format!("{}", guard.is_view as u8)]; for (hash, opt_value_rc) in ops { if let Some(value_rc) = opt_value_rc { if let (Some(value), _rc) = decode_value_with_rc(&value_rc) { if value.len() < TRIE_LIMIT_CACHED_VALUE_SIZE { guard.put(hash, value.into()); + } else { + metrics::SHARD_CACHE_TOO_LARGE.with_label_values(&metrics_labels).inc(); } } else { guard.pop(&hash); @@ -150,10 +328,23 @@ impl TrieStorage for TrieMemoryPartialStorage { /// with 512 MB limit. The total RAM usage for a single shard was 1 GB. #[cfg(not(feature = "no_cache"))] const TRIE_DEFAULT_SHARD_CACHE_SIZE: usize = 50000; - #[cfg(feature = "no_cache")] const TRIE_DEFAULT_SHARD_CACHE_SIZE: usize = 1; +/// Default total size of values which may simultaneously exist the cache. +/// It is chosen by the estimation of the largest contract storage size we are aware as of 23/08/2022. +#[cfg(not(feature = "no_cache"))] +const DEFAULT_SHARD_CACHE_TOTAL_SIZE_LIMIT: u64 = 3_000_000_000; +#[cfg(feature = "no_cache")] +const DEFAULT_SHARD_CACHE_TOTAL_SIZE_LIMIT: u64 = 1; + +/// Capacity for the deletions queue. +/// It is chosen to fit all hashes of deleted nodes for 3 completely full blocks. +#[cfg(not(feature = "no_cache"))] +const DEFAULT_SHARD_CACHE_DELETIONS_QUEUE_CAPACITY: usize = 100_000; +#[cfg(feature = "no_cache")] +const DEFAULT_SHARD_CACHE_DELETIONS_QUEUE_CAPACITY: usize = 1; + /// Values above this size (in bytes) are never cached. /// Note that most of Trie inner nodes are smaller than this - e.g. branches use around 32 * 16 = 512 bytes. pub(crate) const TRIE_LIMIT_CACHED_VALUE_SIZE: usize = 1000; @@ -161,6 +352,7 @@ pub(crate) const TRIE_LIMIT_CACHED_VALUE_SIZE: usize = 1000; pub struct TrieCachingStorage { pub(crate) store: Store, pub(crate) shard_uid: ShardUId, + is_view: bool, /// Caches ever requested items for the shard `shard_uid`. Used to speed up DB operations, presence of any item is /// not guaranteed. @@ -182,10 +374,16 @@ pub struct TrieCachingStorage { } impl TrieCachingStorage { - pub fn new(store: Store, shard_cache: TrieCache, shard_uid: ShardUId) -> TrieCachingStorage { + pub fn new( + store: Store, + shard_cache: TrieCache, + shard_uid: ShardUId, + is_view: bool, + ) -> TrieCachingStorage { TrieCachingStorage { store, shard_uid, + is_view, shard_cache, cache_mode: Cell::new(TrieCacheMode::CachingShard), chunk_cache: RefCell::new(Default::default()), @@ -231,21 +429,35 @@ impl TrieCachingStorage { impl TrieStorage for TrieCachingStorage { fn retrieve_raw_bytes(&self, hash: &CryptoHash) -> Result, StorageError> { + let metrics_labels: [&str; 2] = + [&format!("{}", self.shard_uid.shard_id), &format!("{}", self.is_view as u8)]; + + metrics::CHUNK_CACHE_SIZE + .with_label_values(&metrics_labels) + .set(self.chunk_cache.borrow().len() as i64); // Try to get value from chunk cache containing nodes with cheaper access. We can do it for any `TrieCacheMode`, // because we charge for reading nodes only when `CachingChunk` mode is enabled anyway. if let Some(val) = self.chunk_cache.borrow_mut().get(hash) { + metrics::CHUNK_CACHE_HITS.with_label_values(&metrics_labels).inc(); self.inc_mem_read_nodes(); return Ok(val.clone()); } + metrics::CHUNK_CACHE_MISSES.with_label_values(&metrics_labels).inc(); // Try to get value from shard cache containing most recently touched nodes. let mut guard = self.shard_cache.0.lock().expect(POISONED_LOCK_ERR); + metrics::SHARD_CACHE_SIZE.with_label_values(&metrics_labels).set(guard.len() as i64); + metrics::SHARD_CACHE_CURRENT_TOTAL_SIZE + .with_label_values(&metrics_labels) + .set(guard.current_total_size() as i64); let val = match guard.get(hash) { Some(val) => { + metrics::SHARD_CACHE_HITS.with_label_values(&metrics_labels).inc(); near_o11y::io_trace!(count: "shard_cache_hit"); val.clone() } None => { + metrics::SHARD_CACHE_MISSES.with_label_values(&metrics_labels).inc(); near_o11y::io_trace!(count: "shard_cache_miss"); // If value is not present in cache, get it from the storage. let key = Self::get_key_from_shard_uid_and_hash(self.shard_uid, hash); @@ -265,6 +477,7 @@ impl TrieStorage for TrieCachingStorage { if val.len() < TRIE_LIMIT_CACHED_VALUE_SIZE { guard.put(*hash, val.clone()); } else { + metrics::SHARD_CACHE_TOO_LARGE.with_label_values(&metrics_labels).inc(); near_o11y::io_trace!(count: "shard_cache_too_large"); } @@ -298,3 +511,92 @@ impl TrieStorage for TrieCachingStorage { TrieNodesCount { db_reads: self.db_read_nodes.get(), mem_reads: self.mem_read_nodes.get() } } } + +#[cfg(test)] +mod bounded_queue_tests { + use crate::trie::trie_storage::BoundedQueue; + + #[test] + fn test_put_pop() { + let mut queue = BoundedQueue::new(2); + assert_eq!(queue.put(1), None); + assert_eq!(queue.put(2), None); + assert_eq!(queue.put(3), Some(1)); + + assert_eq!(queue.pop(), Some(2)); + assert_eq!(queue.pop(), Some(3)); + assert_eq!(queue.pop(), None); + } + + #[test] + fn test_clear() { + let mut queue = BoundedQueue::new(2); + queue.put(1); + assert_eq!(queue.queue.get(0), Some(&1)); + queue.clear(); + assert!(queue.queue.is_empty()); + } + + #[test] + fn test_zero_capacity() { + let mut queue = BoundedQueue::new(0); + assert_eq!(queue.put(1), Some(1)); + assert!(queue.queue.is_empty()); + } +} + +#[cfg(test)] +mod trie_cache_tests { + use crate::trie::trie_storage::TrieCacheInner; + use near_primitives::hash::hash; + + fn put_value(cache: &mut TrieCacheInner, value: &[u8]) { + cache.put(hash(value), value.into()); + } + + #[test] + fn test_size_limit() { + let mut cache = TrieCacheInner::new(100, 100, 5, 0, false); + // Add three values. Before each put, condition on total size should not be triggered. + put_value(&mut cache, &[1, 1]); + assert_eq!(cache.total_size, 2); + put_value(&mut cache, &[1, 1, 1]); + assert_eq!(cache.total_size, 5); + put_value(&mut cache, &[1]); + assert_eq!(cache.total_size, 6); + + // Add one of previous values. LRU value should be evicted. + put_value(&mut cache, &[1, 1, 1]); + assert_eq!(cache.total_size, 4); + assert_eq!(cache.cache.pop_lru(), Some((hash(&[1]), vec![1].into()))); + assert_eq!(cache.cache.pop_lru(), Some((hash(&[1, 1, 1]), vec![1, 1, 1].into()))); + } + + #[test] + fn test_deletions_queue() { + let mut cache = TrieCacheInner::new(100, 2, 100, 0, false); + // Add two values to the cache. + put_value(&mut cache, &[1]); + put_value(&mut cache, &[1, 1]); + + // Call pop for inserted values. Because deletions queue is not full, no elements should be deleted. + assert_eq!(cache.pop(&hash(&[1, 1])), None); + assert_eq!(cache.pop(&hash(&[1])), None); + + // Call pop two times for a value existing in cache. Because queue is full, both elements should be deleted. + assert_eq!(cache.pop(&hash(&[1])), Some((hash(&[1, 1]), vec![1, 1].into()))); + assert_eq!(cache.pop(&hash(&[1])), Some((hash(&[1]), vec![1].into()))); + } + + #[test] + fn test_cache_capacity() { + let mut cache = TrieCacheInner::new(2, 100, 100, 0, false); + put_value(&mut cache, &[1]); + put_value(&mut cache, &[2]); + put_value(&mut cache, &[3]); + + assert!(!cache.cache.contains(&hash(&[1]))); + assert!(cache.cache.contains(&hash(&[2]))); + assert!(cache.cache.contains(&hash(&[3]))); + } +} diff --git a/core/store/src/trie/trie_tests.rs b/core/store/src/trie/trie_tests.rs index 9abe817a0c8..5ffcf08b896 100644 --- a/core/store/src/trie/trie_tests.rs +++ b/core/store/src/trie/trie_tests.rs @@ -234,8 +234,9 @@ mod caching_storage_tests { let values = vec![value.clone()]; let shard_uid = ShardUId::single_shard(); let store = create_store_with_values(&values, shard_uid); - let trie_cache = TrieCache::new(); - let trie_caching_storage = TrieCachingStorage::new(store, trie_cache.clone(), shard_uid); + let trie_cache = TrieCache::new(0, false); + let trie_caching_storage = + TrieCachingStorage::new(store, trie_cache.clone(), shard_uid, false); let key = hash(&value); assert_eq!(trie_cache.get(&key), None); @@ -255,7 +256,8 @@ mod caching_storage_tests { fn test_retrieve_error() { let shard_uid = ShardUId::single_shard(); let store = create_test_store(); - let trie_caching_storage = TrieCachingStorage::new(store, TrieCache::new(), shard_uid); + let trie_caching_storage = + TrieCachingStorage::new(store, TrieCache::new(0, false), shard_uid, false); let value = vec![1u8]; let key = hash(&value); @@ -270,8 +272,9 @@ mod caching_storage_tests { let values = vec![value.clone()]; let shard_uid = ShardUId::single_shard(); let store = create_store_with_values(&values, shard_uid); - let trie_cache = TrieCache::new(); - let trie_caching_storage = TrieCachingStorage::new(store, trie_cache.clone(), shard_uid); + let trie_cache = TrieCache::new(0, false); + let trie_caching_storage = + TrieCachingStorage::new(store, trie_cache.clone(), shard_uid, false); let key = hash(&value); trie_caching_storage.set_mode(TrieCacheMode::CachingChunk); @@ -292,8 +295,9 @@ mod caching_storage_tests { let values = vec![vec![1u8]]; let shard_uid = ShardUId::single_shard(); let store = create_store_with_values(&values, shard_uid); - let trie_cache = TrieCache::new(); - let trie_caching_storage = TrieCachingStorage::new(store, trie_cache.clone(), shard_uid); + let trie_cache = TrieCache::new(0, false); + let trie_caching_storage = + TrieCachingStorage::new(store, trie_cache.clone(), shard_uid, false); let value = &values[0]; let key = hash(&value); @@ -340,8 +344,9 @@ mod caching_storage_tests { let values: Vec> = (0..shard_cache_size as u8 + 1).map(|i| vec![i]).collect(); let shard_uid = ShardUId::single_shard(); let store = create_store_with_values(&values, shard_uid); - let trie_cache = TrieCache::with_capacity(shard_cache_size); - let trie_caching_storage = TrieCachingStorage::new(store, trie_cache.clone(), shard_uid); + let trie_cache = TrieCache::with_capacities(shard_cache_size, 0, false); + let trie_caching_storage = + TrieCachingStorage::new(store, trie_cache.clone(), shard_uid, false); let value = &values[0]; let key = hash(&value); diff --git a/runtime/runtime-params-estimator/src/estimator_context.rs b/runtime/runtime-params-estimator/src/estimator_context.rs index da06897003b..2ddb8b2b76f 100644 --- a/runtime/runtime-params-estimator/src/estimator_context.rs +++ b/runtime/runtime-params-estimator/src/estimator_context.rs @@ -123,8 +123,12 @@ impl<'c> Testbed<'c> { pub(crate) fn trie_caching_storage(&mut self) -> TrieCachingStorage { let store = self.inner.store(); - let caching_storage = - TrieCachingStorage::new(store, TrieCache::new(), ShardUId::single_shard()); + let caching_storage = TrieCachingStorage::new( + store, + TrieCache::new(0, false), + ShardUId::single_shard(), + false, + ); caching_storage }