From b0dae263dff63c705414f1c632ce5db31c3026c2 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 20 Mar 2022 14:18:44 +0800 Subject: [PATCH] Add iterator for `dash::Cache` --- src/dash.rs | 5 +- src/dash/base_cache.rs | 11 +++ src/dash/cache.rs | 156 +++++++++++++++++++++++++++++++++++++++-- src/dash/iter.rs | 46 ++++++++++++ src/dash/mapref.rs | 54 ++++++++++++++ 5 files changed, 265 insertions(+), 7 deletions(-) create mode 100644 src/dash/iter.rs create mode 100644 src/dash/mapref.rs diff --git a/src/dash.rs b/src/dash.rs index 5362959a..234c5c51 100644 --- a/src/dash.rs +++ b/src/dash.rs @@ -8,10 +8,13 @@ mod base_cache; mod builder; mod cache; -// mod value_initializer; +mod iter; +mod mapref; pub use builder::CacheBuilder; pub use cache::Cache; +pub use iter::Iter; +pub use mapref::EntryRef; /// Provides extra methods that will be useful for testing. pub trait ConcurrentCacheExt { diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index b8740eba..4e5899b8 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -1,3 +1,4 @@ +use super::Iter; use crate::{ common::{ self, @@ -14,6 +15,7 @@ use crate::{ AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp, }, }; + use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; use dashmap::mapref::one::Ref as DashMapRef; @@ -183,6 +185,10 @@ where self.inner.set_valid_after(now); } + pub(crate) fn iter(&self) -> Iter<'_, K, V, S> { + self.inner.iter() + } + pub(crate) fn max_capacity(&self) -> Option { self.inner.max_capacity() } @@ -503,6 +509,11 @@ where .map(|(key, entry)| KvEntry::new(key, entry)) } + fn iter(&self) -> Iter<'_, K, V, S> { + let map_iter = self.cache.iter(); + Iter::new(map_iter) + } + fn max_capacity(&self) -> Option { self.max_capacity.map(|n| n as usize) } diff --git a/src/dash/cache.rs b/src/dash/cache.rs index 1934ecff..a9fe6356 100644 --- a/src/dash/cache.rs +++ b/src/dash/cache.rs @@ -1,6 +1,6 @@ use super::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, - CacheBuilder, ConcurrentCacheExt, + CacheBuilder, ConcurrentCacheExt, Iter, }; use crate::sync::{housekeeper::InnerSync, Weigher, WriteOp}; @@ -15,11 +15,14 @@ use std::{ /// A thread-safe concurrent in-memory cache built upon [`dashmap::DashMap`][dashmap]. /// -/// `Cache` supports full concurrency of retrievals and a high expected concurrency -/// for updates. +/// Unlike `sync` and `future` caches of Moka, `dash` cache does not provide full +/// concurrency of retrievals. This is because `DashMap` employs read-write locks on +/// internal shards. /// -/// `Cache` utilizes a lock-free concurrent hash table as the central key-value -/// storage. `Cache` performs a best-effort bounding of the map using an entry +/// On the other hand, `dash` cache provids iterator, which returns immutable +/// references to the entries in a cache. +/// +/// `dash` cache performs a best-effort bounding of the map using an entry /// replacement algorithm to determine which entries to evict when the capacity is /// exceeded. /// @@ -355,6 +358,36 @@ where self.base.invalidate_all(); } + /// Creates an iterator over a `moka::dash::Cache` yielding immutable references. + /// + /// **Locking behavior**: This iterator relies on the iterator of + /// [`dashmap::DashMap`][dashmap-iter], which employs read-write locks. May + /// deadlock if the thread holding an iterator attempts to update the cache. + /// + /// [dashmap-iter]: https://docs.rs/dashmap/5.2.0/dashmap/struct.DashMap.html#method.iter + /// + /// # Examples + /// + /// ```rust + /// use moka::dash::Cache; + /// + /// let cache = Cache::new(100); + /// cache.insert("Julia", 14); + /// + /// let mut iter = cache.iter(); + /// let entry_ref = iter.next().unwrap(); + /// assert_eq!(entry_ref.pair(), (&"Julia", &14)); + /// assert_eq!(entry_ref.key(), &"Julia"); + /// assert_eq!(entry_ref.value(), &14); + /// assert_eq!(*entry_ref, 14); + /// + /// assert!(iter.next().is_none()); + /// ``` + /// + pub fn iter(&self) -> Iter<'_, K, V, S> { + self.base.iter() + } + /// Returns the `max_capacity` of this cache. pub fn max_capacity(&self) -> Option { self.base.max_capacity() @@ -453,7 +486,7 @@ mod tests { use super::{Cache, ConcurrentCacheExt}; use crate::common::time::Clock; - use std::time::Duration; + use std::{sync::Arc, time::Duration}; #[test] fn basic_single_thread() { @@ -737,4 +770,115 @@ mod tests { assert_eq!(cache.get_if_present(&"b"), None); assert!(cache.is_table_empty()); } + + #[test] + fn test_iter() { + const NUM_KEYS: usize = 50; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = Cache::builder() + .max_capacity(100) + .time_to_idle(Duration::from_secs(10)) + .build(); + + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)); + } + + let mut key_set = std::collections::HashSet::new(); + + for entry in cache.iter() { + let (key, value) = entry.pair(); + assert_eq!(value, &make_value(*key)); + + key_set.insert(*key); + } + + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + + // DO NOT REMOVE THE COMMENT FROM THIS BLOCK. + // This block demonstrates how you can write a code to get a deadlock. + // { + // let mut iter = cache.iter(); + // let _ = iter.next(); + + // for key in 0..NUM_KEYS { + // cache.insert(key, make_value(key)); + // println!("{}", key); + // } + + // let _ = iter.next(); + // } + } + + /// Runs 16 threads at the same time and ensures no deadlock occurs. + /// + /// - Eight of the threads will update key-values in the cache. + /// - Eight others will iterate the cache. + /// + #[test] + fn test_iter_multi_threads() { + const NUM_KEYS: usize = 1024; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = Cache::builder() + .max_capacity(2048) + .time_to_idle(Duration::from_secs(10)) + .build(); + + // Initialize the cache. + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)); + } + + let rw_lock = Arc::new(std::sync::RwLock::<()>::default()); + let write_lock = rw_lock.write().unwrap(); + + // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect + #[allow(clippy::needless_collect)] + let handles = (0..16usize) + .map(|n| { + let cache = cache.clone(); + let rw_lock = Arc::clone(&rw_lock); + + if n % 2 == 0 { + // This thread will update the cache. + std::thread::spawn(move || { + let read_lock = rw_lock.read().unwrap(); + for key in 0..NUM_KEYS { + // TODO: Update keys in a random order? + cache.insert(key, make_value(key)); + } + std::mem::drop(read_lock); + }) + } else { + // This thread will iterate the cache. + std::thread::spawn(move || { + let read_lock = rw_lock.read().unwrap(); + let mut key_set = std::collections::HashSet::new(); + for entry in cache.iter() { + let (key, value) = entry.pair(); + assert_eq!(value, &make_value(*key)); + key_set.insert(*key); + } + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + std::mem::drop(read_lock); + }) + } + }) + .collect::>(); + + // Let these threads to run by releasing the write lock. + std::mem::drop(write_lock); + + handles.into_iter().for_each(|h| h.join().expect("Failed")); + } } diff --git a/src/dash/iter.rs b/src/dash/iter.rs new file mode 100644 index 00000000..f625736e --- /dev/null +++ b/src/dash/iter.rs @@ -0,0 +1,46 @@ +use super::mapref::EntryRef; +use crate::sync::ValueEntry; + +use std::{ + hash::{BuildHasher, Hash}, + sync::Arc, +}; +use triomphe::Arc as TrioArc; + +type DashMapIter<'a, K, V, S> = dashmap::iter::Iter<'a, Arc, TrioArc>, S>; + +pub struct Iter<'a, K, V, S>(DashMapIter<'a, K, V, S>); + +impl<'a, K, V, S> Iter<'a, K, V, S> { + pub(crate) fn new(map_iter: DashMapIter<'a, K, V, S>) -> Self { + Self(map_iter) + } +} + +impl<'a, K, V, S> Iterator for Iter<'a, K, V, S> +where + K: Eq + Hash, + S: BuildHasher + Clone, +{ + type Item = EntryRef<'a, K, V, S>; + + fn next(&mut self) -> Option { + self.0.next().map(|map_ref| EntryRef::new(map_ref)) + } +} + +unsafe impl<'a, 'i, K, V, S> Send for Iter<'i, K, V, S> +where + K: 'a + Eq + Hash + Send, + V: 'a + Send, + S: 'a + BuildHasher + Clone, +{ +} + +unsafe impl<'a, 'i, K, V, S> Sync for Iter<'i, K, V, S> +where + K: 'a + Eq + Hash + Sync, + V: 'a + Sync, + S: 'a + BuildHasher + Clone, +{ +} diff --git a/src/dash/mapref.rs b/src/dash/mapref.rs new file mode 100644 index 00000000..8d94f52e --- /dev/null +++ b/src/dash/mapref.rs @@ -0,0 +1,54 @@ +use crate::sync::ValueEntry; + +use std::{ + hash::{BuildHasher, Hash}, + sync::Arc, +}; +use triomphe::Arc as TrioArc; + +type DashMapRef<'a, K, V, S> = + dashmap::mapref::multiple::RefMulti<'a, Arc, TrioArc>, S>; + +pub struct EntryRef<'a, K, V, S>(DashMapRef<'a, K, V, S>); + +unsafe impl<'a, K, V, S> Sync for EntryRef<'a, K, V, S> +where + K: Eq + Hash + Send + Sync, + V: Send + Sync, + S: BuildHasher, +{ +} + +impl<'a, K, V, S> EntryRef<'a, K, V, S> +where + K: Eq + Hash, + S: BuildHasher + Clone, +{ + pub(crate) fn new(map_ref: DashMapRef<'a, K, V, S>) -> Self { + Self(map_ref) + } + + pub fn key(&self) -> &K { + self.0.key() + } + + pub fn value(&self) -> &V { + &self.0.value().value + } + + pub fn pair(&self) -> (&K, &V) { + (self.key(), self.value()) + } +} + +impl<'a, K, V, S> std::ops::Deref for EntryRef<'a, K, V, S> +where + K: Eq + Hash, + S: BuildHasher + Clone, +{ + type Target = V; + + fn deref(&self) -> &V { + self.value() + } +}