Skip to content

Commit

Permalink
feat: try reading shard_cache if prefetcher blocking_get returns None (
Browse files Browse the repository at this point in the history
…#8287)

This is the second part of #7723: avoid an unnecessary storage lookup from the main thread in the scenario of forks.
  • Loading branch information
pugachAG authored Jan 4, 2023
1 parent 87ac804 commit 8c3dca9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
8 changes: 8 additions & 0 deletions core/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ pub static PREFETCH_MEMORY_LIMIT_REACHED: Lazy<IntCounterVec> = Lazy::new(|| {
)
.unwrap()
});
pub static PREFETCH_CONFLICT: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_conflict",
"Main thread retrieved value from shard_cache after a conflict with another main thread from a fork.",
&["shard_id"],
)
.unwrap()
});
pub static PREFETCH_RETRY: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_prefetch_retries",
Expand Down
3 changes: 1 addition & 2 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,7 @@ impl TrieStorage for TriePrefetchingStorage {
.or_else(|| {
// `blocking_get` will return None if the prefetch slot has been removed
// by the main thread and the value inserted into the shard cache.
let mut guard = self.shard_cache.lock();
guard.get(hash)
self.shard_cache.get(hash)
})
.ok_or_else(|| {
// This could only happen if this thread started prefetching a value
Expand Down
15 changes: 12 additions & 3 deletions core/store/src/trie/trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ struct TrieCacheInnerMetrics {
prefetch_not_requested: GenericCounter<prometheus::core::AtomicU64>,
prefetch_memory_limit_reached: GenericCounter<prometheus::core::AtomicU64>,
prefetch_retry: GenericCounter<prometheus::core::AtomicU64>,
prefetch_conflict: GenericCounter<prometheus::core::AtomicU64>,
}

impl TrieCachingStorage {
Expand Down Expand Up @@ -447,6 +448,7 @@ impl TrieCachingStorage {
prefetch_memory_limit_reached: metrics::PREFETCH_MEMORY_LIMIT_REACHED
.with_label_values(&metrics_labels[..1]),
prefetch_retry: metrics::PREFETCH_RETRY.with_label_values(&metrics_labels[..1]),
prefetch_conflict: metrics::PREFETCH_CONFLICT.with_label_values(&metrics_labels[..1]),
};
TrieCachingStorage {
store,
Expand Down Expand Up @@ -560,10 +562,17 @@ impl TrieStorage for TrieCachingStorage {
// therefore blocking read will usually not return empty unless there
// was a storage error. Or in the case of forks and parallel chunk
// processing where one chunk cleans up prefetched data from the other.
// In any case, we can try again from the main thread.
// So first we need to check if the data was inserted to shard_cache
// by the main thread from another fork and only if that fails then
// fetch the data from the DB.
None => {
self.metrics.prefetch_retry.inc();
self.read_from_db(hash)?
if let Some(value) = self.shard_cache.get(hash) {
self.metrics.prefetch_conflict.inc();
value
} else {
self.metrics.prefetch_retry.inc();
self.read_from_db(hash)?
}
}
}
}
Expand Down

0 comments on commit 8c3dca9

Please sign in to comment.