Skip to content

Commit

Permalink
refactor: clean up busy waiting in prefetcher blocking get (#8215)
Browse files Browse the repository at this point in the history
Part of #7723

This PR replaces busy waiting loop with update notifications on every underlying map update. It introduces single condition variable which is notified on every map update. One considered alternative is to maintain separate condition variable per slot to have more granular notifications. In practice this doesn't make any difference since additional iterations for irrelevant keys wouldn't cause any noticeable performance overhead, but it results in more complex and harder to reason about code.
  • Loading branch information
pugachAG authored Dec 22, 2022
1 parent 4f8a2b4 commit 915b08b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 15 deletions.
1 change: 1 addition & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub mod metadata;
mod metrics;
pub mod migrations;
mod opener;
mod sync_utils;
pub mod test_utils;
mod trie;

Expand Down
72 changes: 72 additions & 0 deletions core/store/src/sync_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::ops::{Deref, DerefMut};
use std::sync::{Condvar, Mutex, MutexGuard};

const POISONED_LOCK_ERR: &str = "The lock was poisoned.";

/// A convenience wrapper around a Mutex and a Condvar.
///
/// It enables blocking while waiting for the underlying value to be updated.
/// The implementation ensures that any modification results in all blocked
/// threads being notified.
pub(crate) struct Monitor<T> {
cvar: Condvar,
mutex: Mutex<T>,
}

pub(crate) struct MonitorReadGuard<'a, T> {
guard: MutexGuard<'a, T>,
}

pub(crate) struct MonitorWriteGuard<'a, T> {
guard: MutexGuard<'a, T>,
cvar: &'a Condvar,
}

impl<T> Monitor<T> {
pub fn new(t: T) -> Self {
Self { mutex: Mutex::new(t), cvar: Condvar::new() }
}

pub fn lock(&self) -> MonitorReadGuard<'_, T> {
let guard = self.mutex.lock().expect(POISONED_LOCK_ERR);
MonitorReadGuard { guard }
}

pub fn lock_mut(&self) -> MonitorWriteGuard<'_, T> {
let guard = self.mutex.lock().expect(POISONED_LOCK_ERR);
MonitorWriteGuard { guard, cvar: &self.cvar }
}

pub fn wait<'a>(&'a self, guard: MonitorReadGuard<'a, T>) -> MonitorReadGuard<'a, T> {
let guard = self.cvar.wait(guard.guard).expect(POISONED_LOCK_ERR);
MonitorReadGuard { guard }
}
}

impl<T> Deref for MonitorReadGuard<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}

impl<T> Deref for MonitorWriteGuard<'_, T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}

impl<T> DerefMut for MonitorWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.deref_mut()
}
}

impl<T> Drop for MonitorWriteGuard<'_, T> {
fn drop(&mut self) {
self.cvar.notify_all();
}
}
59 changes: 44 additions & 15 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::sync_utils::Monitor;
use crate::trie::POISONED_LOCK_ERR;
use crate::{
metrics, DBCol, StorageError, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig,
Expand All @@ -12,7 +13,7 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, ShardId, StateRoot, TrieNodesCount};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::thread;

const MAX_QUEUED_WORK_ITEMS: usize = 16 * 1024;
Expand Down Expand Up @@ -107,7 +108,7 @@ pub enum PrefetchError {
/// without the prefetcher, because the order in which it sees accesses is
/// independent of the prefetcher.
#[derive(Clone)]
pub(crate) struct PrefetchStagingArea(Arc<Mutex<InnerPrefetchStagingArea>>);
pub(crate) struct PrefetchStagingArea(Arc<Monitor<InnerPrefetchStagingArea>>);

struct InnerPrefetchStagingArea {
slots: SizeTrackedHashMap,
Expand Down Expand Up @@ -311,7 +312,7 @@ impl TriePrefetchingStorage {
impl PrefetchStagingArea {
fn new(shard_id: ShardId) -> Self {
let inner = InnerPrefetchStagingArea { slots: SizeTrackedHashMap::new(shard_id) };
Self(Arc::new(Mutex::new(inner)))
Self(Arc::new(Monitor::new(inner)))
}

/// Release a slot in the prefetcher staging area.
Expand All @@ -322,7 +323,7 @@ impl PrefetchStagingArea {
/// 2: IO thread misses in the shard cache on the same key and starts fetching it again.
/// 3: Main thread value is inserted in shard cache.
pub(crate) fn release(&self, key: &CryptoHash) {
let mut guard = self.lock();
let mut guard = self.0.lock_mut();
let dropped = guard.slots.remove(key);
// `Done` is the result after a successful prefetch.
// `PendingFetch` means the value has been read without a prefetch.
Expand All @@ -345,13 +346,14 @@ impl PrefetchStagingArea {
/// same data and thus are waiting on each other rather than the DB.
/// Of course, that would require prefetching to be moved into an async environment,
pub(crate) fn blocking_get(&self, key: CryptoHash) -> Option<Arc<[u8]>> {
let mut guard = self.0.lock();
loop {
match self.lock().slots.get(&key) {
match guard.slots.get(&key) {
Some(PrefetchSlot::Done(value)) => return Some(value.clone()),
Some(_) => (),
None => return None,
}
thread::sleep(std::time::Duration::from_micros(1));
guard = self.0.wait(guard);
}
}

Expand All @@ -362,7 +364,7 @@ impl PrefetchStagingArea {
}

fn insert_fetched(&self, key: CryptoHash, value: Arc<[u8]>) {
self.lock().slots.insert(key, PrefetchSlot::Done(value));
self.0.lock_mut().slots.insert(key, PrefetchSlot::Done(value));
}

/// Get prefetched value if available and otherwise atomically insert the
Expand All @@ -372,7 +374,7 @@ impl PrefetchStagingArea {
key: CryptoHash,
set_if_empty: PrefetchSlot,
) -> PrefetcherResult {
let mut guard = self.lock();
let mut guard = self.0.lock_mut();
let full =
guard.slots.size_bytes > MAX_PREFETCH_STAGING_MEMORY - PREFETCH_RESERVED_BYTES_PER_SLOT;
match guard.slots.map.get(&key) {
Expand All @@ -393,12 +395,7 @@ impl PrefetchStagingArea {
}

fn clear(&self) {
self.lock().slots.clear();
}

#[track_caller]
fn lock(&self) -> std::sync::MutexGuard<InnerPrefetchStagingArea> {
self.0.lock().expect(POISONED_LOCK_ERR)
self.0.lock_mut().slots.clear();
}
}

Expand Down Expand Up @@ -551,14 +548,15 @@ impl Drop for PrefetchingThreadsHandle {
/// a minimal set of functions is required to check the inner
/// state of the prefetcher.
#[cfg(feature = "test_features")]
mod tests {
mod tests_utils {
use super::{PrefetchApi, PrefetchSlot};
use crate::TrieCachingStorage;

impl PrefetchApi {
/// Returns the number of prefetched values currently staged.
pub fn num_prefetched_and_staged(&self) -> usize {
self.prefetching
.0
.lock()
.slots
.map
Expand All @@ -581,3 +579,34 @@ mod tests {
}
}
}

#[cfg(test)]
mod tests {
use super::{PrefetchStagingArea, PrefetcherResult};
use near_primitives::hash::CryptoHash;

#[test]
fn test_prefetch_staging_area_blocking_get_after_update() {
let key = CryptoHash::hash_bytes(&[1, 2, 3]);
let value: std::sync::Arc<[u8]> = vec![4, 5, 6].into();
let prefetch_staging_area = PrefetchStagingArea::new(0);
assert!(matches!(
prefetch_staging_area.get_or_set_fetching(key),
PrefetcherResult::SlotReserved
));
let prefetch_staging_area2 = prefetch_staging_area.clone();
let value2 = value.clone();
// We need to execute `blocking_get` before `insert_fetched` so that
// it blocks while waiting for the value to be updated. Spawning
// a new thread + yielding should give enough time for the main
// thread to make progress. Please note that even if `insert_fetched`
// is executed before `blocking_get`, that still wouldn't result in
// any flakiness since the test would still pass, it just won't verify
// the synchronization part of `blocking_get`.
std::thread::spawn(move || {
std::thread::yield_now();
prefetch_staging_area2.insert_fetched(key, value2);
});
assert_eq!(prefetch_staging_area.blocking_get(key), Some(value));
}
}

0 comments on commit 915b08b

Please sign in to comment.