Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iterator for dash::Cache #101

Merged
merged 3 commits into from
Mar 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

### Added

- Add a synchronous cache `moka::dash::Cache`, which uses `dash::DashMap` as
the internal storage. ([#99][gh-pull-0099])
#### Experimental Additions

Please note that the following additions are highly experimental so their APIs will
be frequently changed in next few releases.

- Add a synchronous cache `moka::dash::Cache`, which uses `dashmap::DashMap` as the
internal storage. ([#99][gh-pull-0099])
- Add iterator to `moka::dash::Cache`. ([#101][gh-pull-0101])

### Changed

Expand Down Expand Up @@ -249,6 +255,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).
[gh-issue-0038]: https://github.com/moka-rs/moka/issues/38/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0101]: https://github.com/moka-rs/moka/pull/101/
[gh-pull-0100]: https://github.com/moka-rs/moka/pull/100/
[gh-pull-0099]: https://github.com/moka-rs/moka/pull/99/
[gh-pull-0086]: https://github.com/moka-rs/moka/pull/86/
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ default = ["atomic64"]
# Enable this feature to use `moka::future::Cache`.
future = ["async-io", "async-lock", "futures-util"]

# Enable this feature to use `moka::dash::Cache`.
# Enable this feature to use **experimental** `moka::dash::Cache`.
# Please note that the APIs for this feature will be frequently changed in next
# few releases.
dash = ["dashmap"]

# This feature is enabled by default. Disable it when the target platform does not
Expand Down
9 changes: 6 additions & 3 deletions src/dash.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Provides a thread-safe, concurrent cache implementation built upon
//! [`dashmap::DashMap`][dashmap].
//! **Experimental**: Provides a thread-safe, concurrent cache implementation
//! built upon [`dashmap::DashMap`][dashmap].
//!
//! To use this module, enable a crate feature called "dash".
//!
Expand All @@ -8,10 +8,13 @@
mod base_cache;
mod builder;
mod cache;
// mod value_initializer;
mod iter;
mod mapref;

pub use builder::CacheBuilder;
pub use cache::Cache;
pub use iter::Iter;
pub use mapref::EntryRef;

/// Provides extra methods that will be useful for testing.
pub trait ConcurrentCacheExt<K, V> {
Expand Down
11 changes: 11 additions & 0 deletions src/dash/base_cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::Iter;
use crate::{
common::{
self,
Expand All @@ -14,6 +15,7 @@ use crate::{
AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp,
},
};

use crossbeam_channel::{Receiver, Sender, TrySendError};
use crossbeam_utils::atomic::AtomicCell;
use dashmap::mapref::one::Ref as DashMapRef;
Expand Down Expand Up @@ -183,6 +185,10 @@ where
self.inner.set_valid_after(now);
}

pub(crate) fn iter(&self) -> Iter<'_, K, V, S> {
self.inner.iter()
}

pub(crate) fn max_capacity(&self) -> Option<usize> {
self.inner.max_capacity()
}
Expand Down Expand Up @@ -503,6 +509,11 @@ where
.map(|(key, entry)| KvEntry::new(key, entry))
}

fn iter(&self) -> Iter<'_, K, V, S> {
let map_iter = self.cache.iter();
Iter::new(map_iter)
}

fn max_capacity(&self) -> Option<usize> {
self.max_capacity.map(|n| n as usize)
}
Expand Down
163 changes: 155 additions & 8 deletions src/dash/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS},
CacheBuilder, ConcurrentCacheExt,
CacheBuilder, ConcurrentCacheExt, Iter,
};
use crate::sync::{housekeeper::InnerSync, Weigher, WriteOp};

Expand All @@ -13,17 +13,23 @@ use std::{
time::Duration,
};

/// A thread-safe concurrent in-memory cache built upon [`dashmap::DashMap`][dashmap].
/// **Experimental**: A thread-safe concurrent in-memory cache built upon
/// [`dashmap::DashMap`][dashmap].
///
/// `Cache` supports full concurrency of retrievals and a high expected concurrency
/// for updates.
/// Unlike `sync` and `future` caches of Moka, `dash` cache does not provide full
/// concurrency of retrievals. This is because `DashMap` employs read-write locks on
/// internal shards.
///
/// `Cache` utilizes a lock-free concurrent hash table as the central key-value
/// storage. `Cache` performs a best-effort bounding of the map using an entry
/// On the other hand, `dash` cache provids iterator, which returns immutable
/// references to the entries in a cache.
///
/// `dash` cache performs a best-effort bounding of the map using an entry
/// replacement algorithm to determine which entries to evict when the capacity is
/// exceeded.
///
/// To use this cache, enable a crate feature called "dash".
/// To use this cache, enable a crate feature called "dash". Please note that the APIs
/// will _be changed very often_ in next few releases as this is yet an experimental
/// feature.
///
/// # Examples
///
Expand Down Expand Up @@ -355,6 +361,36 @@ where
self.base.invalidate_all();
}

/// Creates an iterator over a `moka::dash::Cache` yielding immutable references.
///
/// **Locking behavior**: This iterator relies on the iterator of
/// [`dashmap::DashMap`][dashmap-iter], which employs read-write locks. May
/// deadlock if the thread holding an iterator attempts to update the cache.
///
/// [dashmap-iter]: https://docs.rs/dashmap/5.2.0/dashmap/struct.DashMap.html#method.iter
///
/// # Examples
///
/// ```rust
/// use moka::dash::Cache;
///
/// let cache = Cache::new(100);
/// cache.insert("Julia", 14);
///
/// let mut iter = cache.iter();
/// let entry_ref = iter.next().unwrap();
/// assert_eq!(entry_ref.pair(), (&"Julia", &14));
/// assert_eq!(entry_ref.key(), &"Julia");
/// assert_eq!(entry_ref.value(), &14);
/// assert_eq!(*entry_ref, 14);
///
/// assert!(iter.next().is_none());
/// ```
///
pub fn iter(&self) -> Iter<'_, K, V, S> {
self.base.iter()
}

/// Returns the `max_capacity` of this cache.
pub fn max_capacity(&self) -> Option<usize> {
self.base.max_capacity()
Expand Down Expand Up @@ -453,7 +489,7 @@ mod tests {
use super::{Cache, ConcurrentCacheExt};
use crate::common::time::Clock;

use std::time::Duration;
use std::{sync::Arc, time::Duration};

#[test]
fn basic_single_thread() {
Expand Down Expand Up @@ -737,4 +773,115 @@ mod tests {
assert_eq!(cache.get_if_present(&"b"), None);
assert!(cache.is_table_empty());
}

#[test]
fn test_iter() {
const NUM_KEYS: usize = 50;

fn make_value(key: usize) -> String {
format!("val: {}", key)
}

let cache = Cache::builder()
.max_capacity(100)
.time_to_idle(Duration::from_secs(10))
.build();

for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}

let mut key_set = std::collections::HashSet::new();

for entry in cache.iter() {
let (key, value) = entry.pair();
assert_eq!(value, &make_value(*key));

key_set.insert(*key);
}

// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);

// DO NOT REMOVE THE COMMENT FROM THIS BLOCK.
// This block demonstrates how you can write a code to get a deadlock.
// {
// let mut iter = cache.iter();
// let _ = iter.next();

// for key in 0..NUM_KEYS {
// cache.insert(key, make_value(key));
// println!("{}", key);
// }

// let _ = iter.next();
// }
}

/// Runs 16 threads at the same time and ensures no deadlock occurs.
///
/// - Eight of the threads will update key-values in the cache.
/// - Eight others will iterate the cache.
///
#[test]
fn test_iter_multi_threads() {
const NUM_KEYS: usize = 1024;

fn make_value(key: usize) -> String {
format!("val: {}", key)
}

let cache = Cache::builder()
.max_capacity(2048)
.time_to_idle(Duration::from_secs(10))
.build();

// Initialize the cache.
for key in 0..NUM_KEYS {
cache.insert(key, make_value(key));
}

let rw_lock = Arc::new(std::sync::RwLock::<()>::default());
let write_lock = rw_lock.write().unwrap();

// https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect
#[allow(clippy::needless_collect)]
let handles = (0..16usize)
.map(|n| {
let cache = cache.clone();
let rw_lock = Arc::clone(&rw_lock);

if n % 2 == 0 {
// This thread will update the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
for key in 0..NUM_KEYS {
// TODO: Update keys in a random order?
cache.insert(key, make_value(key));
}
std::mem::drop(read_lock);
})
} else {
// This thread will iterate the cache.
std::thread::spawn(move || {
let read_lock = rw_lock.read().unwrap();
let mut key_set = std::collections::HashSet::new();
for entry in cache.iter() {
let (key, value) = entry.pair();
assert_eq!(value, &make_value(*key));
key_set.insert(*key);
}
// Ensure there are no missing or duplicate keys in the iteration.
assert_eq!(key_set.len(), NUM_KEYS);
std::mem::drop(read_lock);
})
}
})
.collect::<Vec<_>>();

// Let these threads to run by releasing the write lock.
std::mem::drop(write_lock);

handles.into_iter().for_each(|h| h.join().expect("Failed"));
}
}
46 changes: 46 additions & 0 deletions src/dash/iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use super::mapref::EntryRef;
use crate::sync::ValueEntry;

use std::{
hash::{BuildHasher, Hash},
sync::Arc,
};
use triomphe::Arc as TrioArc;

type DashMapIter<'a, K, V, S> = dashmap::iter::Iter<'a, Arc<K>, TrioArc<ValueEntry<K, V>>, S>;

pub struct Iter<'a, K, V, S>(DashMapIter<'a, K, V, S>);

impl<'a, K, V, S> Iter<'a, K, V, S> {
pub(crate) fn new(map_iter: DashMapIter<'a, K, V, S>) -> Self {
Self(map_iter)
}
}

impl<'a, K, V, S> Iterator for Iter<'a, K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
type Item = EntryRef<'a, K, V, S>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|map_ref| EntryRef::new(map_ref))
}
}

unsafe impl<'a, 'i, K, V, S> Send for Iter<'i, K, V, S>
where
K: 'a + Eq + Hash + Send,
V: 'a + Send,
S: 'a + BuildHasher + Clone,
{
}

unsafe impl<'a, 'i, K, V, S> Sync for Iter<'i, K, V, S>
where
K: 'a + Eq + Hash + Sync,
V: 'a + Sync,
S: 'a + BuildHasher + Clone,
{
}
54 changes: 54 additions & 0 deletions src/dash/mapref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use crate::sync::ValueEntry;

use std::{
hash::{BuildHasher, Hash},
sync::Arc,
};
use triomphe::Arc as TrioArc;

type DashMapRef<'a, K, V, S> =
dashmap::mapref::multiple::RefMulti<'a, Arc<K>, TrioArc<ValueEntry<K, V>>, S>;

pub struct EntryRef<'a, K, V, S>(DashMapRef<'a, K, V, S>);

unsafe impl<'a, K, V, S> Sync for EntryRef<'a, K, V, S>
where
K: Eq + Hash + Send + Sync,
V: Send + Sync,
S: BuildHasher,
{
}

impl<'a, K, V, S> EntryRef<'a, K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
pub(crate) fn new(map_ref: DashMapRef<'a, K, V, S>) -> Self {
Self(map_ref)
}

pub fn key(&self) -> &K {
self.0.key()
}

pub fn value(&self) -> &V {
&self.0.value().value
}

pub fn pair(&self) -> (&K, &V) {
(self.key(), self.value())
}
}

impl<'a, K, V, S> std::ops::Deref for EntryRef<'a, K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
type Target = V;

fn deref(&self) -> &V {
self.value()
}
}
Loading