Skip to content

Commit

Permalink
Add iterator for dash::Cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya6502 committed Mar 20, 2022
1 parent 859ab4f commit b0dae26
Show file tree
Hide file tree
Showing 5 changed files with 265 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/dash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
mod base_cache;
mod builder;
mod cache;
// mod value_initializer;
mod iter;
mod mapref;

pub use builder::CacheBuilder;
pub use cache::Cache;
pub use iter::Iter;
pub use mapref::EntryRef;

/// Provides extra methods that will be useful for testing.
pub trait ConcurrentCacheExt<K, V> {
Expand Down
11 changes: 11 additions & 0 deletions src/dash/base_cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Iter;
use crate::{
common::{
self,
Expand All @@ -14,6 +15,7 @@ use crate::{
AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp,
},
};

use crossbeam_channel::{Receiver, Sender, TrySendError};
use crossbeam_utils::atomic::AtomicCell;
use dashmap::mapref::one::Ref as DashMapRef;
Expand Down Expand Up @@ -183,6 +185,10 @@ where
self.inner.set_valid_after(now);
}

pub(crate) fn iter(&self) -> Iter<'_, K, V, S> {
self.inner.iter()
}

pub(crate) fn max_capacity(&self) -> Option<usize> {
self.inner.max_capacity()
}
Expand Down Expand Up @@ -503,6 +509,11 @@ where
.map(|(key, entry)| KvEntry::new(key, entry))
}

fn iter(&self) -> Iter<'_, K, V, S> {
let map_iter = self.cache.iter();
Iter::new(map_iter)
}

fn max_capacity(&self) -> Option<usize> {
self.max_capacity.map(|n| n as usize)
}
Expand Down
156 changes: 150 additions & 6 deletions src/dash/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS},
CacheBuilder, ConcurrentCacheExt,
CacheBuilder, ConcurrentCacheExt, Iter,
};
use crate::sync::{housekeeper::InnerSync, Weigher, WriteOp};

Expand All @@ -15,11 +15,14 @@ use std::{

/// A thread-safe concurrent in-memory cache built upon [`dashmap::DashMap`][dashmap].
///
/// `Cache` supports full concurrency of retrievals and a high expected concurrency
/// for updates.
/// Unlike `sync` and `future` caches of Moka, `dash` cache does not provide full
/// concurrency of retrievals. This is because `DashMap` employs read-write locks on
/// internal shards.
///
/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
/// storage. `Cache` performs a best-effort bounding of the map using an entry
/// On the other hand, `dash` cache provids iterator, which returns immutable
/// references to the entries in a cache.
///
/// `dash` cache performs a best-effort bounding of the map using an entry
/// replacement algorithm to determine which entries to evict when the capacity is
/// exceeded.
///
Expand Down Expand Up @@ -355,6 +358,36 @@ where
self.base.invalidate_all();
}

/// Creates an iterator over a `moka::dash::Cache` yielding immutable references.
///
/// **Locking behavior**: This iterator relies on the iterator of
/// [`dashmap::DashMap`][dashmap-iter], which employs read-write locks. May
/// deadlock if the thread holding an iterator attempts to update the cache.
///
/// [dashmap-iter]: https://docs.rs/dashmap/5.2.0/dashmap/struct.DashMap.html#method.iter
///
/// # Examples
///
/// ```rust
/// use moka::dash::Cache;
///
/// let cache = Cache::new(100);
/// cache.insert("Julia", 14);
///
/// let mut iter = cache.iter();
/// let entry_ref = iter.next().unwrap();
/// assert_eq!(entry_ref.pair(), (&"Julia", &14));
/// assert_eq!(entry_ref.key(), &"Julia");
/// assert_eq!(entry_ref.value(), &14);
/// assert_eq!(*entry_ref, 14);
///
/// assert!(iter.next().is_none());
/// ```
///
pub fn iter(&self) -> Iter<'_, K, V, S> {
self.base.iter()
}

/// Returns the `max_capacity` of this cache.
pub fn max_capacity(&self) -> Option<usize> {
self.base.max_capacity()
Expand Down Expand Up @@ -453,7 +486,7 @@ mod tests {
use super::{Cache, ConcurrentCacheExt};
use crate::common::time::Clock;

use std::time::Duration;
use std::{sync::Arc, time::Duration};

#[test]
fn basic_single_thread() {
Expand Down Expand Up @@ -737,4 +770,115 @@ mod tests {
assert_eq!(cache.get_if_present(&"b"), None);
assert!(cache.is_table_empty());
}

#[test]
fn test_iter() {
const NUM_KEYS: usize = 50;

fn make_value(key: usize) -> String {
format!("val: {}", key)
}

let cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();

for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}

let mut key_set = std::collections::HashSet::new();

for entry in cache.iter() {
let (key, value) = entry.pair();
assert_eq!(value, &make_value(*key));

key_set.insert(*key);
}

// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);

// DO NOT REMOVE THE COMMENT FROM THIS BLOCK.
// This block demonstrates how you can write a code to get a deadlock.
// {
// let mut iter = cache.iter();
// let _ = iter.next();

// for key in 0..NUM_KEYS {
// cache.insert(key, make_value(key));
// println!("{}", key);
// }

// let _ = iter.next();
// }
}

/// Runs 16 threads at the same time and ensures no deadlock occurs.
///
/// - Eight of the threads will update key-values in the cache.
/// - Eight others will iterate the cache.
///
#[test]
fn test_iter_multi_threads() {
const NUM_KEYS: usize = 1024;

fn make_value(key: usize) -> String {
format!("val: {}", key)
}

let cache = Cache::builder()
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();

// Initialize the cache.
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}

let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().unwrap();

// https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
#[allow(clippy::needless_collect)]
let handles = (0..16usize)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);

if n % 2 == 0 {
// This thread will update the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
for key in 0..NUM_KEYS {
// TODO: Update keys in a random order?
cache.insert(key, make_value(key));
}
std::mem::drop(read_lock);
})
} else {
// This thread will iterate the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
let mut key_set = std::collections::HashSet::new();
for entry in cache.iter() {
let (key, value) = entry.pair();
assert_eq!(value, &make_value(*key));
key_set.insert(*key);
}
// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();

// Let these threads to run by releasing the write lock.
std::mem::drop(write_lock);

handles.into_iter().for_each(|h| h.join().expect("Failed"));
}
}
46 changes: 46 additions & 0 deletions src/dash/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use super::mapref::EntryRef;
use crate::sync::ValueEntry;

use std::{
hash::{BuildHasher, Hash},
sync::Arc,
};
use triomphe::Arc as TrioArc;

type DashMapIter<'a, K, V, S> = dashmap::iter::Iter<'a, Arc<K>, TrioArc<ValueEntry<K, V>>, S>;

pub struct Iter<'a, K, V, S>(DashMapIter<'a, K, V, S>);

impl<'a, K, V, S> Iter<'a, K, V, S> {
pub(crate) fn new(map_iter: DashMapIter<'a, K, V, S>) -> Self {
Self(map_iter)
}
}

impl<'a, K, V, S> Iterator for Iter<'a, K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
type Item = EntryRef<'a, K, V, S>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|map_ref| EntryRef::new(map_ref))
}
}

unsafe impl<'a, 'i, K, V, S> Send for Iter<'i, K, V, S>
where
K: 'a + Eq + Hash + Send,
V: 'a + Send,
S: 'a + BuildHasher + Clone,
{
}

unsafe impl<'a, 'i, K, V, S> Sync for Iter<'i, K, V, S>
where
K: 'a + Eq + Hash + Sync,
V: 'a + Sync,
S: 'a + BuildHasher + Clone,
{
}
54 changes: 54 additions & 0 deletions src/dash/mapref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::sync::ValueEntry;

use std::{
hash::{BuildHasher, Hash},
sync::Arc,
};
use triomphe::Arc as TrioArc;

type DashMapRef<'a, K, V, S> =
dashmap::mapref::multiple::RefMulti<'a, Arc<K>, TrioArc<ValueEntry<K, V>>, S>;

pub struct EntryRef<'a, K, V, S>(DashMapRef<'a, K, V, S>);

unsafe impl<'a, K, V, S> Sync for EntryRef<'a, K, V, S>
where
K: Eq + Hash + Send + Sync,
V: Send + Sync,
S: BuildHasher,
{
}

impl<'a, K, V, S> EntryRef<'a, K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
pub(crate) fn new(map_ref: DashMapRef<'a, K, V, S>) -> Self {
Self(map_ref)
}

pub fn key(&self) -> &K {
self.0.key()
}

pub fn value(&self) -> &V {
&self.0.value().value
}

pub fn pair(&self) -> (&K, &V) {
(self.key(), self.value())
}
}

impl<'a, K, V, S> std::ops::Deref for EntryRef<'a, K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
type Target = V;

fn deref(&self) -> &V {
self.value()
}
}

0 comments on commit b0dae26

Please sign in to comment.