Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improved shard cache #7429

Merged
merged 47 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
3f35f27
Introduce Storage Metrics for Prometheus
firatNEAR Aug 16, 2022
9aa35a7
new shard cache
Looogarithm Aug 17, 2022
a1e4a62
minor fix
Looogarithm Aug 17, 2022
8080aa3
Fix estimator
firatNEAR Aug 18, 2022
adf8034
warning fixes
Looogarithm Aug 18, 2022
a139dec
separate deletions queue struct, fix push bug
Looogarithm Aug 18, 2022
158db1a
bounded queue tests
Looogarithm Aug 19, 2022
8534a85
fix pub
Looogarithm Aug 19, 2022
bf9b814
move const
Looogarithm Aug 19, 2022
0451f05
move const
Looogarithm Aug 19, 2022
2e53081
move const
Looogarithm Aug 19, 2022
9cd170c
cache size limit
Looogarithm Aug 19, 2022
81f6648
synctriecache test
Looogarithm Aug 19, 2022
01db7a9
synctriecache test
Looogarithm Aug 19, 2022
2258f8d
test deletions queue removal
Looogarithm Aug 19, 2022
54171d7
test cache capacity
Looogarithm Aug 19, 2022
520fee9
Merge branch 'master' into smart-cache-1
Longarithm Aug 19, 2022
2b7f7bf
Fix comments
firatNEAR Aug 19, 2022
f1ced93
Merge branch 'feat/add_storage_metrics' into smart-cache-1
Looogarithm Aug 19, 2022
9e3108b
temporary fixes
Looogarithm Aug 19, 2022
e41e0c6
fix capacity for view cache
Looogarithm Aug 20, 2022
381f5f9
add new metrics
Looogarithm Aug 20, 2022
5de3996
add default capacities
Looogarithm Aug 20, 2022
27ef6ac
size limit
Looogarithm Aug 20, 2022
aa4df83
continue
Looogarithm Aug 20, 2022
14028bf
fix labels
Looogarithm Aug 22, 2022
d43a90a
Merge branch 'master' into smart-cache-1
Longarithm Aug 22, 2022
3c6b38a
fix capacity
Looogarithm Aug 22, 2022
434f6fc
Merge branch 'smart-cache-1' of github.com:near/nearcore into smart-c…
Looogarithm Aug 22, 2022
ce4c842
Add total_size metric
firatNEAR Aug 22, 2022
b2bec9a
Merge branch 'master' into smart-cache-1
firatNEAR Aug 22, 2022
474af0a
Fix mutex lock
firatNEAR Aug 22, 2022
ae1db75
Merge branch 'smart-cache-1' of github.com:near/nearcore into smart-c…
firatNEAR Aug 22, 2022
2dd713e
move deletions_into call to commit
Looogarithm Aug 23, 2022
4f8ad9c
add log_assert
Looogarithm Aug 23, 2022
b558989
size assertion
Looogarithm Aug 23, 2022
5eeca03
fix log_assert and metrics
Looogarithm Aug 23, 2022
7cb42ec
comments
Looogarithm Aug 23, 2022
1241305
fix assertion
Looogarithm Aug 23, 2022
bb75f49
Merge branch 'master' into smart-cache-1
Longarithm Aug 23, 2022
824dfee
put deletions to separate store update
Looogarithm Aug 23, 2022
bc37898
Merge branch 'smart-cache-1' of github.com:near/nearcore into smart-c…
Looogarithm Aug 23, 2022
3c31f7c
update changelog
Looogarithm Aug 26, 2022
95cd339
Merge branch 'master' into smart-cache-1
Longarithm Aug 26, 2022
470a063
mention ram in changelog
Looogarithm Aug 26, 2022
5666a84
4 -> 3
Looogarithm Aug 26, 2022
40f5914
Merge branch 'master' into smart-cache-1
Longarithm Aug 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4533,7 +4533,7 @@ impl<'a> ChainUpdate<'a> {
&result.shard_uid,
new_chunk_extra,
);
self.chain_store_update.save_trie_changes(result.trie_changes);
self.chain_store_update.save_trie_changes(result.trie_changes)?;
}
assert_eq!(sum_gas_used, total_gas_used);
assert_eq!(sum_balance_burnt, total_balance_burnt);
Expand Down Expand Up @@ -4580,7 +4580,7 @@ impl<'a> ChainUpdate<'a> {
apply_result.total_balance_burnt,
),
);
self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_trie_changes(apply_result.trie_changes)?;
self.chain_store_update.save_outgoing_receipt(
&block_hash,
shard_id,
Expand Down Expand Up @@ -4614,7 +4614,7 @@ impl<'a> ChainUpdate<'a> {
*new_extra.state_root_mut() = apply_result.new_root;

self.chain_store_update.save_chunk_extra(&block_hash, &shard_uid, new_extra);
self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_trie_changes(apply_result.trie_changes)?;

if let Some(apply_results_or_state_changes) = apply_split_result_or_state_changes {
self.process_split_state(
Expand Down Expand Up @@ -4966,7 +4966,7 @@ impl<'a> ChainUpdate<'a> {

self.chain_store_update.save_chunk(chunk);

self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_trie_changes(apply_result.trie_changes)?;
let chunk_extra = ChunkExtra::new(
&apply_result.new_root,
outcome_root,
Expand Down Expand Up @@ -5045,7 +5045,7 @@ impl<'a> ChainUpdate<'a> {
Default::default(),
)?;

self.chain_store_update.save_trie_changes(apply_result.trie_changes);
self.chain_store_update.save_trie_changes(apply_result.trie_changes)?;

let mut new_chunk_extra = ChunkExtra::clone(&chunk_extra);
*new_chunk_extra.state_root_mut() = apply_result.new_root;
Expand Down
10 changes: 9 additions & 1 deletion chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1818,8 +1818,16 @@ impl<'a> ChainStoreUpdate<'a> {
self.chain_store_cache_update.outcome_ids.insert((*block_hash, shard_id), outcome_ids);
}

pub fn save_trie_changes(&mut self, trie_changes: WrappedTrieChanges) {
// fix me: catch error everywhere
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handing here seems seriously off. I think this function can't actually fail, and that ? in

                        let (shard_uid, hash) =
                            TrieCachingStorage::get_shard_uid_and_hash_from_key(key)?;

is wrong, and should have been .unwrap() instead. We are not getting this data from the database, this is all purely in-memory, so any erroneous data is a programming error, and not the corrupted database.

I don't think we should do this refactor right now, as it is urgent, but this absolutely has to be fixed after the release is out. Propagating impossible error conditions is a tech debt that spills over.

pub fn save_trie_changes(&mut self, trie_changes: WrappedTrieChanges) -> Result<(), Error> {
// Pop deleted trie nodes from the shard cache.
// TODO: get rid of creating `StoreUpdate`
let mut store_update = self.store().store_update();
trie_changes.deletions_into(&mut store_update);
store_update.update_cache()?;
Longarithm marked this conversation as resolved.
Show resolved Hide resolved

self.trie_changes.push(trie_changes);
Ok(())
}

pub fn add_state_changes_for_split_states(
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/src/tests/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ fn do_fork(
Default::default(),
*block.hash(),
);
store_update.save_trie_changes(wrapped_trie_changes);
store_update.save_trie_changes(wrapped_trie_changes).unwrap();

prev_state_roots[shard_id as usize] = new_root;
trie_changes_shards.push(trie_changes_data);
Expand Down
2 changes: 1 addition & 1 deletion core/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl Default for StoreConfig {
// we use it since then.
block_size: bytesize::ByteSize::kib(16),

trie_cache_capacities: Default::default(),
trie_cache_capacities: vec![(ShardUId { version: 1, shard_id: 3 }, 2_000_000)],
firatNEAR marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
19 changes: 12 additions & 7 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,17 @@ impl StoreUpdate {
self.transaction.merge(other.transaction)
}

pub fn update_cache(&self) -> io::Result<()> {
if let Some(tries) = &self.shard_tries {
// Note: avoid comparing wide pointers here to work-around
// https://github.com/rust-lang/rust/issues/69757
let addr = |arc| Arc::as_ptr(arc) as *const u8;
assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),);
tries.update_cache(&self.transaction)?;
}
Ok(())
}

pub fn commit(self) -> io::Result<()> {
debug_assert!(
{
Expand All @@ -376,13 +387,7 @@ impl StoreUpdate {
"Transaction overwrites itself: {:?}",
self
);
if let Some(tries) = self.shard_tries {
// Note: avoid comparing wide pointers here to work-around
// https://github.com/rust-lang/rust/issues/69757
let addr = |arc| Arc::as_ptr(arc) as *const u8;
assert_eq!(addr(&tries.get_store().storage), addr(&self.storage),);
tries.update_cache(&self.transaction)?;
}
self.update_cache()?;
let _span = tracing::trace_span!(target: "store", "commit").entered();
for op in &self.transaction.ops {
match op {
Expand Down
109 changes: 108 additions & 1 deletion core/store/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use near_metrics::{try_create_histogram_vec, HistogramVec};
use near_metrics::{
try_create_histogram_vec, try_create_int_counter_vec, try_create_int_gauge_vec, HistogramVec,
IntCounterVec, IntGaugeVec,
};
use once_cell::sync::Lazy;

pub(crate) static DATABASE_OP_LATENCY_HIST: Lazy<HistogramVec> = Lazy::new(|| {
Expand All @@ -10,3 +13,107 @@ pub(crate) static DATABASE_OP_LATENCY_HIST: Lazy<HistogramVec> = Lazy::new(|| {
)
.unwrap()
});

pub static CHUNK_CACHE_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_chunk_cache_hits",
"Chunk cache hits",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static CHUNK_CACHE_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_chunk_cache_misses",
"Chunk cache misses",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_hits",
"Shard cache hits",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_misses",
"Shard cache misses",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_TOO_LARGE: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_too_large",
"Number of values to be inserted into shard cache is too large",
&["shard_id", "is_view"],
)
.unwrap()
});

pub static SHARD_CACHE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec("near_shard_cache_size", "Shard cache size", &["shard_id", "is_view"])
.unwrap()
});

pub static CHUNK_CACHE_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec("near_chunk_cache_size", "Chunk cache size", &["shard_id", "is_view"])
.unwrap()
});

pub static SHARD_CACHE_POP_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_pop_hits",
"Shard cache pop hits",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_POP_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_pop_misses",
"Shard cache pop misses",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_GC_POP_MISSES: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_shard_cache_gc_pop_misses",
"Shard cache gc pop misses",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static SHARD_CACHE_DELETIONS_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_shard_cache_deletions_size",
"Shard cache deletions size",
&["shard_id", "is_view"],
)
.unwrap()
});
pub static APPLIED_TRIE_DELETIONS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_applied_trie_deletions",
"Applied deletions to trie",
&["shard_id"],
)
.unwrap()
});
pub static APPLIED_TRIE_INSERTIONS: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_applied_trie_insertions",
"Applied insertions to trie",
&["shard_id"],
)
.unwrap()
});
39 changes: 27 additions & 12 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use near_primitives::state_record::is_delayed_receipt_key;
use crate::flat_state::FlatState;
use crate::trie::trie_storage::{TrieCache, TrieCachingStorage};
use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR};
use crate::{DBCol, DBOp, DBTransaction};
use crate::{metrics, DBCol, DBOp, DBTransaction};
use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate};

/// Responsible for creation of trie caches, stores necessary configuration for it.
Expand All @@ -42,20 +42,24 @@ impl TrieCacheFactory {
}

/// Create new cache for the given shard uid.
pub fn create_cache(&self, shard_uid: &ShardUId) -> TrieCache {
match self.capacities.get(shard_uid) {
Some(capacity) => TrieCache::with_capacity(*capacity),
None => TrieCache::new(),
pub fn create_cache(&self, shard_uid: &ShardUId, is_view: bool) -> TrieCache {
let capacity = if is_view { None } else { self.capacities.get(shard_uid) };
match capacity {
Some(capacity) => TrieCache::with_capacities(*capacity, shard_uid.shard_id, is_view),
None => TrieCache::new(shard_uid.shard_id, is_view),
}
}

/// Create caches on the initialization of storage structures.
pub fn create_initial_caches(&self) -> HashMap<ShardUId, TrieCache> {
pub fn create_initial_caches(&self, is_view: bool) -> HashMap<ShardUId, TrieCache> {
assert_ne!(self.num_shards, 0);
let shards: Vec<_> = (0..self.num_shards)
.map(|shard_id| ShardUId { version: self.shard_version, shard_id: shard_id as u32 })
.collect();
shards.iter().map(|&shard_uid| (shard_uid, self.create_cache(&shard_uid))).collect()
shards
.iter()
.map(|&shard_uid| (shard_uid, self.create_cache(&shard_uid, is_view)))
.collect()
}
}

Expand All @@ -73,8 +77,8 @@ pub struct ShardTries(Arc<ShardTriesInner>);

impl ShardTries {
pub fn new(store: Store, trie_cache_factory: TrieCacheFactory) -> Self {
let caches = trie_cache_factory.create_initial_caches();
let view_caches = trie_cache_factory.create_initial_caches();
let caches = trie_cache_factory.create_initial_caches(false);
let view_caches = trie_cache_factory.create_initial_caches(true);
ShardTries(Arc::new(ShardTriesInner {
store,
trie_cache_factory,
Expand Down Expand Up @@ -112,10 +116,11 @@ impl ShardTries {
let mut caches = caches_to_use.write().expect(POISONED_LOCK_ERR);
caches
.entry(shard_uid)
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid))
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid, is_view))
.clone()
};
let storage = Box::new(TrieCachingStorage::new(self.0.store.clone(), cache, shard_uid));
let storage =
Box::new(TrieCachingStorage::new(self.0.store.clone(), cache, shard_uid, is_view));
let flat_state = {
#[cfg(feature = "protocol_feature_flat_state")]
if use_flat_state {
Expand Down Expand Up @@ -177,7 +182,7 @@ impl ShardTries {
for (shard_uid, ops) in shards {
let cache = caches
.entry(shard_uid)
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid))
.or_insert_with(|| self.0.trie_cache_factory.create_cache(&shard_uid, false))
.clone();
cache.update_cache(ops);
}
Expand Down Expand Up @@ -238,6 +243,9 @@ impl ShardTries {
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
metrics::APPLIED_TRIE_INSERTIONS
.with_label_values(&[&format!("{}", shard_uid.shard_id)])
.inc_by(trie_changes.insertions.len() as u64);
self.apply_insertions_inner(&trie_changes.insertions, shard_uid, store_update)
}

Expand All @@ -247,6 +255,9 @@ impl ShardTries {
shard_uid: ShardUId,
store_update: &mut StoreUpdate,
) {
metrics::APPLIED_TRIE_DELETIONS
.with_label_values(&[&format!("{}", shard_uid.shard_id)])
.inc_by(trie_changes.deletions.len() as u64);
self.apply_deletions_inner(&trie_changes.deletions, shard_uid, store_update)
}

Expand Down Expand Up @@ -325,6 +336,10 @@ impl WrappedTrieChanges {
self.tries.apply_insertions(&self.trie_changes, self.shard_uid, store_update)
}

pub fn deletions_into(&self, store_update: &mut StoreUpdate) {
Longarithm marked this conversation as resolved.
Show resolved Hide resolved
self.tries.apply_deletions(&self.trie_changes, self.shard_uid, store_update)
}

/// Save state changes into Store.
///
/// NOTE: the changes are drained from `self`.
Expand Down
Loading