diff --git a/CHANGELOG.md b/CHANGELOG.md index ea44f00f..f233aca9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Moka — Change Log +## Version 0.8.2 + +### Added + +- Add iterator to the following caches: ([#114][gh-pull-0114]) + - `sync::Cache` + - `sync::SegmentedCache` + - `future::Cache` + - `unsync::Cache` +- Implement `IntoIterator` to the all caches (including experimental `dash::Cache`) + ([#114][gh-pull-0114]) + + ## Version 0.8.1 ### Added @@ -287,6 +300,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25). [gh-issue-0038]: https://github.com/moka-rs/moka/issues/38/ [gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/ +[gh-pull-0114]: https://github.com/moka-rs/moka/pull/114/ [gh-pull-0105]: https://github.com/moka-rs/moka/pull/105/ [gh-pull-0104]: https://github.com/moka-rs/moka/pull/104/ [gh-pull-0103]: https://github.com/moka-rs/moka/pull/103/ diff --git a/Cargo.toml b/Cargo.toml index 14996701..840b5701 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.8.1" +version = "0.8.2" edition = "2018" rust-version = "1.51" diff --git a/src/cht/map/bucket.rs b/src/cht/map/bucket.rs index 18ce21b4..a7faba36 100644 --- a/src/cht/map/bucket.rs +++ b/src/cht/map/bucket.rs @@ -334,6 +334,33 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray { loop_result.returned().flatten() } + + pub(crate) fn keys( + &self, + guard: &'g Guard, + with_key: &mut F, + ) -> Result, RelocatedError> + where + F: FnMut(&K) -> T, + { + let mut keys = Vec::new(); + + for bucket in self.buckets.iter() { + let bucket_ptr = bucket.load_consume(guard); + + if is_sentinel(bucket_ptr) { + return Err(RelocatedError); + } + + if let Some(bucket_ref) = unsafe { bucket_ptr.as_ref() } { + if !is_tombstone(bucket_ptr) { + keys.push(with_key(&bucket_ref.key)); + } + } + } + + Ok(keys) + } } impl<'g, K: 'g, V: 'g> BucketArray { diff --git a/src/cht/map/bucket_array_ref.rs b/src/cht/map/bucket_array_ref.rs index 18c9493e..9937e0d5 100644 --- a/src/cht/map/bucket_array_ref.rs +++ b/src/cht/map/bucket_array_ref.rs @@ -266,6 +266,34 @@ where result } + + pub(crate) fn keys(&self, mut with_key: F) -> Vec + where + F: FnMut(&K) -> T, + { + let guard = &crossbeam_epoch::pin(); + let current_ref = self.get(guard); + let mut bucket_array_ref = current_ref; + + let result; + + loop { + match bucket_array_ref.keys(guard, &mut with_key) { + Ok(keys) => { + result = keys; + break; + } + Err(_) => { + bucket_array_ref = + bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand); + } + } + } + + self.swing(guard, current_ref, bucket_array_ref); + + result + } } impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> { diff --git a/src/cht/segment.rs b/src/cht/segment.rs index 11571a8c..888e0a58 100644 --- a/src/cht/segment.rs +++ b/src/cht/segment.rs @@ -530,6 +530,28 @@ impl HashMap { result } + pub(crate) fn keys(&self, segment: usize, with_key: F) -> Option> + where + F: FnMut(&K) -> T, + { + if segment >= self.segments.len() { + return None; + } + + let Segment { + ref bucket_array, + ref len, + } = self.segments[segment]; + + let bucket_array_ref = BucketArrayRef { + bucket_array, + build_hasher: &self.build_hasher, + len, + }; + + Some(bucket_array_ref.keys(with_key)) + } + #[inline] pub(crate) fn hash(&self, key: &Q) -> u64 where @@ -538,6 +560,10 @@ impl HashMap { { bucket::hash(&self.build_hasher, key) } + + pub(crate) fn actual_num_segments(&self) -> usize { + self.segments.len() + } } impl Drop for HashMap { @@ -1610,4 +1636,49 @@ mod tests { run_deferred(); } + + #[test] + fn keys_in_single_segment() { + let map = + HashMap::with_num_segments_capacity_and_hasher(1, 0, DefaultHashBuilder::default()); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + const NUM_KEYS: usize = 200; + + for i in 0..NUM_KEYS { + let key = Arc::new(i); + let hash = map.hash(&key); + assert_eq!(map.insert_entry_and(key, hash, i, |_, v| *v), None); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), NUM_KEYS); + + let mut keys = map.keys(0, |k| Arc::clone(k)).unwrap(); + assert_eq!(keys.len(), NUM_KEYS); + keys.sort_unstable(); + + for (i, key) in keys.into_iter().enumerate() { + assert_eq!(i, *key); + } + + for i in (0..NUM_KEYS).step_by(2) { + assert_eq!(map.remove(&i, map.hash(&i)), Some(i)); + } + + assert!(!map.is_empty()); + assert_eq!(map.len(), NUM_KEYS / 2); + + let mut keys = map.keys(0, |k| Arc::clone(k)).unwrap(); + assert_eq!(keys.len(), NUM_KEYS / 2); + keys.sort_unstable(); + + for (i, key) in keys.into_iter().enumerate() { + assert_eq!(i, *key / 2); + } + + run_deferred(); + } } diff --git a/src/dash/cache.rs b/src/dash/cache.rs index 85170f52..cd2515a9 100644 --- a/src/dash/cache.rs +++ b/src/dash/cache.rs @@ -1,6 +1,6 @@ use super::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, - CacheBuilder, ConcurrentCacheExt, Iter, + CacheBuilder, ConcurrentCacheExt, EntryRef, Iter, }; use crate::{ sync::{housekeeper::InnerSync, Weigher, WriteOp}, @@ -25,9 +25,6 @@ use std::{ /// Since `DashMap` employs read-write locks on internal shards, it will have lower /// concurrency on retrievals and updates than other caches. /// -/// On the other hand, `dash` cache provides iterator, which returns immutable -/// references to the entries in a cache. Other caches do not provide iterator. -/// /// `dash` cache performs a best-effort bounding of the map using an entry /// replacement algorithm to determine which entries to evict when the capacity is /// exceeded. @@ -436,13 +433,24 @@ where V: 'a, S: BuildHasher + Clone, { - /// Creates an iterator over a `moka::dash::Cache` yielding immutable references. + /// Creates an iterator visiting all key-value pairs in arbitrary order. The + /// iterator element type is [`EntryRef<'a, K, V, S>`][moka-entry-ref]. + /// + /// Unlike the `get` method, visiting entries via an iterator do not update the + /// historic popularity estimator or reset idle timers for keys. + /// + /// # Guarantees + /// + /// **TODO** + /// + /// # Locking behavior /// - /// **Locking behavior**: This iterator relies on the iterator of - /// [`dashmap::DashMap`][dashmap-iter], which employs read-write locks. May - /// deadlock if the thread holding an iterator attempts to update the cache. + /// This iterator relies on the iterator of [`dashmap::DashMap`][dashmap-iter], + /// which employs read-write locks. May deadlock if the thread holding an + /// iterator attempts to update the cache. /// - /// [dashmap-iter]: https://docs.rs/dashmap/5.2.0/dashmap/struct.DashMap.html#method.iter + /// [moka-entry-ref]: ./struct.EntryRef.html + /// [dashmap-iter]: /// /// # Examples /// @@ -478,6 +486,21 @@ where } } +impl<'a, K, V, S> IntoIterator for &'a Cache +where + K: 'a + Eq + Hash, + V: 'a, + S: BuildHasher + Clone, +{ + type Item = EntryRef<'a, K, V, S>; + + type IntoIter = Iter<'a, K, V, S>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + // private methods impl Cache where @@ -902,7 +925,7 @@ mod tests { let mut key_set = std::collections::HashSet::new(); - for entry in cache.iter() { + for entry in &cache { let (key, value) = entry.pair(); assert_eq!(value, &make_value(*key)); @@ -934,7 +957,10 @@ mod tests { /// #[test] fn test_iter_multi_threads() { + use std::collections::HashSet; + const NUM_KEYS: usize = 1024; + const NUM_THREADS: usize = 16; fn make_value(key: usize) -> String { format!("val: {}", key) @@ -955,7 +981,7 @@ mod tests { // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect #[allow(clippy::needless_collect)] - let handles = (0..16usize) + let handles = (0..NUM_THREADS) .map(|n| { let cache = cache.clone(); let rw_lock = Arc::clone(&rw_lock); @@ -974,8 +1000,8 @@ mod tests { // This thread will iterate the cache. std::thread::spawn(move || { let read_lock = rw_lock.read().unwrap(); - let mut key_set = std::collections::HashSet::new(); - for entry in cache.iter() { + let mut key_set = HashSet::new(); + for entry in &cache { let (key, value) = entry.pair(); assert_eq!(value, &make_value(*key)); key_set.insert(*key); @@ -992,6 +1018,10 @@ mod tests { std::mem::drop(write_lock); handles.into_iter().for_each(|h| h.join().expect("Failed")); + + // Ensure there are no missing or duplicate keys in the iteration. + let key_set = cache.iter().map(|ent| *ent.key()).collect::>(); + assert_eq!(key_set.len(), NUM_KEYS); } #[test] diff --git a/src/future/cache.rs b/src/future/cache.rs index 1aef3b13..75c99edf 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -6,6 +6,7 @@ use crate::{ sync::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, + iter::Iter, PredicateId, Weigher, WriteOp, }, Policy, PredicateError, @@ -696,6 +697,67 @@ where self.base.invalidate_entries_if(Arc::new(predicate)) } + /// Creates an iterator visiting all key-value pairs in arbitrary order. The + /// iterator element type is `(Arc, V)`, where `V` is a clone of a stored + /// value. + /// + /// Iterators do not block concurrent reads and writes on the cache. An entry can + /// be inserted to, invalidated or evicted from a cache while iterators are alive + /// on the same cache. + /// + /// Unlike the `get` method, visiting entries via an iterator do not update the + /// historic popularity estimator or reset idle timers for keys. + /// + /// # Guarantees + /// + /// In order to allow concurrent access to the cache, iterator's `next` method + /// does _not_ guarantee the following: + /// + /// - It does not guarantee to return a key-value pair (an entry) if its key has + /// been inserted to the cache _after_ the iterator was created. + /// - Such an entry may or may not be returned depending on key's hash and + /// timing. + /// + /// and the `next` method guarantees the followings: + /// + /// - It guarantees not to return the same entry more than once. + /// - It guarantees not to return an entry if it has been removed from the cache + /// after the iterator was created. + /// - Note: An entry can be removed by following reasons: + /// - Manually invalidated. + /// - Expired (e.g. time-to-live). + /// - Evicted as the cache capacity exceeded. + /// + /// # Examples + /// + /// ```rust + /// // Cargo.toml + /// // + /// // [dependencies] + /// // moka = { version = "0.8.2", features = ["future"] } + /// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] } + /// use moka::future::Cache; + /// + /// #[tokio::main] + /// async fn main() { + /// let cache = Cache::new(100); + /// cache.insert("Julia", 14).await; + /// + /// let mut iter = cache.iter(); + /// let (k, v) = iter.next().unwrap(); // (Arc, V) + /// assert_eq!(*k, "Julia"); + /// assert_eq!(v, 14); + /// + /// assert!(iter.next().is_none()); + /// } + /// ``` + /// + pub fn iter(&self) -> Iter<'_, K, V> { + use crate::sync::iter::ScanningGet; + + Iter::with_single_cache_segment(&self.base, self.base.num_cht_segments()) + } + /// Returns a `BlockingOp` for this cache. It provides blocking /// [insert](#method.insert) and [invalidate](#method.invalidate) methods, which /// can be called outside of asynchronous contexts. @@ -727,6 +789,21 @@ where } } +impl<'a, K, V, S> IntoIterator for &'a Cache +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, +{ + type Item = (Arc, V); + + type IntoIter = Iter<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + impl ConcurrentCacheExt for Cache where K: Hash + Eq + Send + Sync + 'static, @@ -738,7 +815,9 @@ where } } +// // private methods +// impl Cache where K: Hash + Eq + Send + Sync + 'static, @@ -1131,10 +1210,10 @@ mod tests { #[tokio::test] async fn basic_multi_async_tasks() { - let num_threads = 4; + let num_tasks = 4; let cache = Cache::new(100); - let tasks = (0..num_threads) + let tasks = (0..num_tasks) .map(|id| { let cache = cache.clone(); if id == 0 { @@ -1306,10 +1385,12 @@ mod tests { assert!(cache.contains_key(&"a")); mock.increment(Duration::from_secs(5)); // 10 secs. - cache.sync(); - assert_eq!(cache.get(&"a"), None); assert!(!cache.contains_key(&"a")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); assert!(cache.is_table_empty()); cache.insert("b", "bob").await; @@ -1335,12 +1416,15 @@ mod tests { assert_eq!(cache.estimated_entry_count(), 1); mock.increment(Duration::from_secs(5)); // 25 secs - cache.sync(); assert_eq!(cache.get(&"a"), None); assert_eq!(cache.get(&"b"), None); assert!(!cache.contains_key(&"a")); assert!(!cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); assert!(cache.is_table_empty()); } @@ -1386,24 +1470,130 @@ mod tests { assert_eq!(cache.estimated_entry_count(), 2); mock.increment(Duration::from_secs(3)); // 15 secs. - cache.sync(); - assert_eq!(cache.get(&"a"), None); assert_eq!(cache.get(&"b"), Some("bob")); assert!(!cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); - assert_eq!(cache.estimated_entry_count(), 1); - mock.increment(Duration::from_secs(10)); // 25 secs + assert_eq!(cache.iter().count(), 1); + cache.sync(); + assert_eq!(cache.estimated_entry_count(), 1); + mock.increment(Duration::from_secs(10)); // 25 secs assert_eq!(cache.get(&"a"), None); assert_eq!(cache.get(&"b"), None); assert!(!cache.contains_key(&"a")); assert!(!cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); assert!(cache.is_table_empty()); } + #[tokio::test] + async fn test_iter() { + const NUM_KEYS: usize = 50; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = Cache::builder() + .max_capacity(100) + .time_to_idle(Duration::from_secs(10)) + .build(); + + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)).await; + } + + let mut key_set = std::collections::HashSet::new(); + + for (key, value) in &cache { + assert_eq!(value, make_value(*key)); + + key_set.insert(*key); + } + + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + } + + /// Runs 16 async tasks at the same time and ensures no deadlock occurs. + /// + /// - Eight of the task will update key-values in the cache. + /// - Eight others will iterate the cache. + /// + #[tokio::test] + async fn test_iter_multi_async_tasks() { + use std::collections::HashSet; + + const NUM_KEYS: usize = 1024; + const NUM_TASKS: usize = 16; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = Cache::builder() + .max_capacity(2048) + .time_to_idle(Duration::from_secs(10)) + .build(); + + // Initialize the cache. + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)).await; + } + + let rw_lock = Arc::new(tokio::sync::RwLock::<()>::default()); + let write_lock = rw_lock.write().await; + + let tasks = (0..NUM_TASKS) + .map(|n| { + let cache = cache.clone(); + let rw_lock = Arc::clone(&rw_lock); + + if n % 2 == 0 { + // This thread will update the cache. + tokio::spawn(async move { + let read_lock = rw_lock.read().await; + for key in 0..NUM_KEYS { + // TODO: Update keys in a random order? + cache.insert(key, make_value(key)).await; + } + std::mem::drop(read_lock); + }) + } else { + // This thread will iterate the cache. + tokio::spawn(async move { + let read_lock = rw_lock.read().await; + let mut key_set = HashSet::new(); + // let mut key_count = 0usize; + for (key, value) in &cache { + assert_eq!(value, make_value(*key)); + key_set.insert(*key); + // key_count += 1; + } + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + std::mem::drop(read_lock); + }) + } + }) + .collect::>(); + + // Let these threads to run by releasing the write lock. + std::mem::drop(write_lock); + + let _ = futures_util::future::join_all(tasks).await; + + // Ensure there are no missing or duplicate keys in the iteration. + let key_set = cache.iter().map(|(k, _v)| *k).collect::>(); + assert_eq!(key_set.len(), NUM_KEYS); + } + #[tokio::test] async fn get_with() { let cache = Cache::new(100); diff --git a/src/sync.rs b/src/sync.rs index 78bb30ce..080e2b30 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -14,6 +14,7 @@ pub(crate) mod deques; pub(crate) mod entry_info; pub(crate) mod housekeeper; mod invalidator; +pub(crate) mod iter; mod segment; mod value_initializer; @@ -22,6 +23,7 @@ pub(crate) mod debug_counters; pub use builder::CacheBuilder; pub use cache::Cache; +pub use iter::Iter; pub use segment::SegmentedCache; use self::entry_info::EntryInfo; diff --git a/src/sync/base_cache.rs b/src/sync/base_cache.rs index 44dda735..e71541ab 100644 --- a/src/sync/base_cache.rs +++ b/src/sync/base_cache.rs @@ -3,6 +3,7 @@ use super::{ entry_info::EntryInfo, housekeeper::{Housekeeper, InnerSync, SyncPace}, invalidator::{GetOrRemoveEntry, InvalidationResult, Invalidator, KeyDateLite, PredicateFun}, + iter::ScanningGet, AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, PredicateId, ReadOp, ValueEntry, Weigher, WriteOp, }; @@ -246,6 +247,44 @@ where } } +// +// Iterator support +// +impl ScanningGet for BaseCache +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, +{ + fn num_cht_segments(&self) -> usize { + self.inner.num_cht_segments() + } + + fn scanning_get(&self, key: &Arc) -> Option { + let hash = self.hash(key); + self.inner.get_key_value_and_then(key, hash, |k, entry| { + let i = &self.inner; + let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); + let now = i.current_time_from_expiration_clock(); + + if is_expired_entry_wo(ttl, va, entry, now) + || is_expired_entry_ao(tti, va, entry, now) + || i.is_invalidated_entry(k, entry) + { + // Expired or invalidated entry. + None + } else { + // Valid entry. + Some(entry.value.clone()) + } + }) + } + + fn keys(&self, cht_segment: usize) -> Option>> { + self.inner.keys(cht_segment) + } +} + // // private methods // @@ -602,6 +641,19 @@ where .map(|(key, entry)| KvEntry::new(key, entry)) } + fn keys(&self, cht_segment: usize) -> Option>> { + // Do `Arc::clone` instead of `Arc::downgrade`. Updating existing entry + // in the cht with a new value replaces the key in the cht even though the + // old and new keys are equal. If we return `Weak`, it will not be + // upgraded later to `Arc as the key may have been replaced with a new + // key that equals to the old key. + self.cache.keys(cht_segment, Arc::clone) + } + + fn num_cht_segments(&self) -> usize { + self.cache.actual_num_segments() + } + fn policy(&self) -> Policy { Policy::new(self.max_capacity, 1, self.time_to_live, self.time_to_idle) } diff --git a/src/sync/cache.rs b/src/sync/cache.rs index e23007a1..9b178128 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1,6 +1,7 @@ use super::{ base_cache::{BaseCache, HouseKeeperArc, MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, housekeeper::InnerSync, + iter::{Iter, ScanningGet}, value_initializer::ValueInitializer, CacheBuilder, ConcurrentCacheExt, PredicateId, Weigher, WriteOp, }; @@ -696,6 +697,57 @@ where self.base.invalidate_entries_if(predicate) } + /// Creates an iterator visiting all key-value pairs in arbitrary order. The + /// iterator element type is `(Arc, V)`, where `V` is a clone of a stored + /// value. + /// + /// Iterators do not block concurrent reads and writes on the cache. An entry can + /// be inserted to, invalidated or evicted from a cache while iterators are alive + /// on the same cache. + /// + /// Unlike the `get` method, visiting entries via an iterator do not update the + /// historic popularity estimator or reset idle timers for keys. + /// + /// # Guarantees + /// + /// In order to allow concurrent access to the cache, iterator's `next` method + /// does _not_ guarantee the following: + /// + /// - It does not guarantee to return a key-value pair (an entry) if its key has + /// been inserted to the cache _after_ the iterator was created. + /// - Such an entry may or may not be returned depending on key's hash and + /// timing. + /// + /// and the `next` method guarantees the followings: + /// + /// - It guarantees not to return the same entry more than once. + /// - It guarantees not to return an entry if it has been removed from the cache + /// after the iterator was created. + /// - Note: An entry can be removed by following reasons: + /// - Manually invalidated. + /// - Expired (e.g. time-to-live). + /// - Evicted as the cache capacity exceeded. + /// + /// # Examples + /// + /// ```rust + /// use moka::sync::Cache; + /// + /// let cache = Cache::new(100); + /// cache.insert("Julia", 14); + /// + /// let mut iter = cache.iter(); + /// let (k, v) = iter.next().unwrap(); // (Arc, V) + /// assert_eq!(*k, "Julia"); + /// assert_eq!(v, 14); + /// + /// assert!(iter.next().is_none()); + /// ``` + /// + pub fn iter(&self) -> Iter<'_, K, V> { + Iter::with_single_cache_segment(&self.base, self.num_cht_segments()) + } + /// Returns a read-only cache policy of this cache. /// /// At this time, cache policy cannot be modified after cache creation. @@ -715,6 +767,21 @@ where } } +impl<'a, K, V, S> IntoIterator for &'a Cache +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, +{ + type Item = (Arc, V); + + type IntoIter = Iter<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + impl ConcurrentCacheExt for Cache where K: Hash + Eq + Send + Sync + 'static, @@ -726,7 +793,31 @@ where } } +// +// Iterator support +// +impl ScanningGet for Cache +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, +{ + fn num_cht_segments(&self) -> usize { + self.base.num_cht_segments() + } + + fn scanning_get(&self, key: &Arc) -> Option { + self.base.scanning_get(key) + } + + fn keys(&self, cht_segment: usize) -> Option>> { + self.base.keys(cht_segment) + } +} + +// // private methods +// impl Cache where K: Hash + Eq + Send + Sync + 'static, @@ -1120,10 +1211,12 @@ mod tests { assert!(cache.contains_key(&"a")); mock.increment(Duration::from_secs(5)); // 10 secs. - cache.sync(); - assert_eq!(cache.get(&"a"), None); assert!(!cache.contains_key(&"a")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); assert!(cache.is_table_empty()); cache.insert("b", "bob"); @@ -1149,12 +1242,15 @@ mod tests { assert_eq!(cache.estimated_entry_count(), 1); mock.increment(Duration::from_secs(5)); // 25 secs - cache.sync(); assert_eq!(cache.get(&"a"), None); assert_eq!(cache.get(&"b"), None); assert!(!cache.contains_key(&"a")); assert!(!cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); assert!(cache.is_table_empty()); } @@ -1200,24 +1296,130 @@ mod tests { assert_eq!(cache.estimated_entry_count(), 2); mock.increment(Duration::from_secs(3)); // 15 secs. - cache.sync(); - assert_eq!(cache.get(&"a"), None); assert_eq!(cache.get(&"b"), Some("bob")); assert!(!cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); - assert_eq!(cache.estimated_entry_count(), 1); - mock.increment(Duration::from_secs(10)); // 25 secs + assert_eq!(cache.iter().count(), 1); + cache.sync(); + assert_eq!(cache.estimated_entry_count(), 1); + mock.increment(Duration::from_secs(10)); // 25 secs assert_eq!(cache.get(&"a"), None); assert_eq!(cache.get(&"b"), None); assert!(!cache.contains_key(&"a")); assert!(!cache.contains_key(&"b")); + + assert_eq!(cache.iter().count(), 0); + + cache.sync(); assert!(cache.is_table_empty()); } + #[test] + fn test_iter() { + const NUM_KEYS: usize = 50; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = Cache::builder() + .max_capacity(100) + .time_to_idle(Duration::from_secs(10)) + .build(); + + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)); + } + + let mut key_set = std::collections::HashSet::new(); + + for (key, value) in &cache { + assert_eq!(value, make_value(*key)); + + key_set.insert(*key); + } + + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + } + + /// Runs 16 threads at the same time and ensures no deadlock occurs. + /// + /// - Eight of the threads will update key-values in the cache. + /// - Eight others will iterate the cache. + /// + #[test] + fn test_iter_multi_threads() { + use std::collections::HashSet; + + const NUM_KEYS: usize = 1024; + const NUM_THREADS: usize = 16; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = Cache::builder() + .max_capacity(2048) + .time_to_idle(Duration::from_secs(10)) + .build(); + + // Initialize the cache. + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)); + } + + let rw_lock = Arc::new(std::sync::RwLock::<()>::default()); + let write_lock = rw_lock.write().unwrap(); + + // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect + #[allow(clippy::needless_collect)] + let handles = (0..NUM_THREADS) + .map(|n| { + let cache = cache.clone(); + let rw_lock = Arc::clone(&rw_lock); + + if n % 2 == 0 { + // This thread will update the cache. + std::thread::spawn(move || { + let read_lock = rw_lock.read().unwrap(); + for key in 0..NUM_KEYS { + // TODO: Update keys in a random order? + cache.insert(key, make_value(key)); + } + std::mem::drop(read_lock); + }) + } else { + // This thread will iterate the cache. + std::thread::spawn(move || { + let read_lock = rw_lock.read().unwrap(); + let mut key_set = HashSet::new(); + for (key, value) in &cache { + assert_eq!(value, make_value(*key)); + key_set.insert(*key); + } + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + std::mem::drop(read_lock); + }) + } + }) + .collect::>(); + + // Let these threads to run by releasing the write lock. + std::mem::drop(write_lock); + + handles.into_iter().for_each(|h| h.join().expect("Failed")); + + // Ensure there are no missing or duplicate keys in the iteration. + let key_set = cache.iter().map(|(k, _v)| *k).collect::>(); + assert_eq!(key_set.len(), NUM_KEYS); + } + #[test] fn get_with() { use std::thread::{sleep, spawn}; diff --git a/src/sync/iter.rs b/src/sync/iter.rs new file mode 100644 index 00000000..ef730882 --- /dev/null +++ b/src/sync/iter.rs @@ -0,0 +1,135 @@ +use std::{hash::Hash, sync::Arc}; + +// This trait is implemented by `sync::BaseCache` and `sync::Cache`. +pub(crate) trait ScanningGet { + /// Returns the number of segments in the concurrent hash table. + fn num_cht_segments(&self) -> usize; + + /// Returns a _clone_ of the value corresponding to the key. + /// + /// Unlike the `get` method of cache, this method is not considered a cache read + /// operation, so it does not update the historic popularity estimator or reset + /// the idle timer for the key. + fn scanning_get(&self, key: &Arc) -> Option; + + /// Returns a vec of keys in a specified segment of the concurrent hash table. + fn keys(&self, cht_segment: usize) -> Option>>; +} + +pub struct Iter<'i, K, V> { + keys: Option>>, + cache_segments: Box<[&'i dyn ScanningGet]>, + num_cht_segments: usize, + cache_seg_index: usize, + cht_seg_index: usize, + is_done: bool, +} + +impl<'i, K, V> Iter<'i, K, V> { + pub(crate) fn with_single_cache_segment( + cache: &'i dyn ScanningGet, + num_cht_segments: usize, + ) -> Self { + Self { + keys: None, + cache_segments: Box::new([cache]), + num_cht_segments, + cache_seg_index: 0, + cht_seg_index: 0, + is_done: false, + } + } + + pub(crate) fn with_multiple_cache_segments( + cache_segments: Box<[&'i dyn ScanningGet]>, + num_cht_segments: usize, + ) -> Self { + Self { + keys: None, + cache_segments, + num_cht_segments, + cache_seg_index: 0, + cht_seg_index: 0, + is_done: false, + } + } +} + +impl<'i, K, V> Iterator for Iter<'i, K, V> +where + K: Eq + Hash + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + type Item = (Arc, V); + + fn next(&mut self) -> Option { + if self.is_done { + return None; + } + + while let Some(key) = self.next_key() { + if let Some(v) = self.cache().scanning_get(&key) { + return Some((key, v)); + } + } + + self.is_done = true; + None + } +} + +impl<'i, K, V> Iter<'i, K, V> +where + K: Eq + Hash + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + fn cache(&self) -> &'i dyn ScanningGet { + self.cache_segments[self.cache_seg_index] + } + + fn next_key(&mut self) -> Option> { + while let Some(keys) = self.current_keys() { + if let key @ Some(_) = keys.pop() { + return key; + } + } + None + } + + fn current_keys(&mut self) -> Option<&mut Vec>> { + // If keys is none or some but empty, try to get next keys. + while self.keys.as_ref().map_or(true, Vec::is_empty) { + // Adjust indices. + if self.cht_seg_index >= self.num_cht_segments { + self.cache_seg_index += 1; + self.cht_seg_index = 0; + if self.cache_seg_index >= self.cache_segments.len() { + // No more cache segments left. + return None; + } + } + + let cache_segment = self.cache_segments[self.cache_seg_index]; + self.keys = cache_segment.keys(self.cht_seg_index); + self.num_cht_segments = cache_segment.num_cht_segments(); + + self.cht_seg_index += 1; + } + + self.keys.as_mut() + } +} + +unsafe impl<'a, 'i, K, V> Send for Iter<'i, K, V> +where + K: 'a + Eq + Hash + Send, + V: 'a + Send, +{ +} + +unsafe impl<'a, 'i, K, V> Sync for Iter<'i, K, V> +where + K: 'a + Eq + Hash + Sync, + V: 'a + Sync, +{ +} diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 34cbb1ec..f2b5388e 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1,4 +1,8 @@ -use super::{cache::Cache, CacheBuilder, ConcurrentCacheExt, Weigher}; +use super::{ + cache::Cache, + iter::{Iter, ScanningGet}, + CacheBuilder, ConcurrentCacheExt, Weigher, +}; use crate::{Policy, PredicateError}; use std::{ @@ -287,6 +291,65 @@ where Ok(()) } + /// Creates an iterator visiting all key-value pairs in arbitrary order. The + /// iterator element type is `(Arc, V)`, where `V` is a clone of a stored + /// value. + /// + /// Iterators do not block concurrent reads and writes on the cache. An entry can + /// be inserted to, invalidated or evicted from a cache while iterators are alive + /// on the same cache. + /// + /// Unlike the `get` method, visiting entries via an iterator do not update the + /// historic popularity estimator or reset idle timers for keys. + /// + /// # Guarantees + /// + /// In order to allow concurrent access to the cache, iterator's `next` method + /// does _not_ guarantee the following: + /// + /// - It does not guarantee to return a key-value pair (an entry) if its key has + /// been inserted to the cache _after_ the iterator was created. + /// - Such an entry may or may not be returned depending on key's hash and + /// timing. + /// + /// and the `next` method guarantees the followings: + /// + /// - It guarantees not to return the same entry more than once. + /// - It guarantees not to return an entry if it has been removed from the cache + /// after the iterator was created. + /// - Note: An entry can be removed by following reasons: + /// - Manually invalidated. + /// - Expired (e.g. time-to-live). + /// - Evicted as the cache capacity exceeded. + /// + /// # Examples + /// + /// ```rust + /// use moka::sync::SegmentedCache; + /// + /// let cache = SegmentedCache::new(100, 4); + /// cache.insert("Julia", 14); + /// + /// let mut iter = cache.iter(); + /// let (k, v) = iter.next().unwrap(); // (Arc, V) + /// assert_eq!(*k, "Julia"); + /// assert_eq!(v, 14); + /// + /// assert!(iter.next().is_none()); + /// ``` + /// + pub fn iter(&self) -> Iter<'_, K, V> { + let num_cht_segments = self.inner.segments[0].num_cht_segments(); + let segments = self + .inner + .segments + .iter() + .map(|c| c as &dyn ScanningGet<_, _>) + .collect::>() + .into_boxed_slice(); + Iter::with_multiple_cache_segments(segments, num_cht_segments) + } + /// Returns a read-only cache policy of this cache. /// /// At this time, cache policy cannot be modified after cache creation. @@ -326,6 +389,21 @@ where // } } +impl<'a, K, V, S> IntoIterator for &'a SegmentedCache +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, + S: BuildHasher + Clone + Send + Sync + 'static, +{ + type Item = (Arc, V); + + type IntoIter = Iter<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + impl ConcurrentCacheExt for SegmentedCache where K: Hash + Eq + Send + Sync + 'static, @@ -481,7 +559,7 @@ where #[cfg(test)] mod tests { use super::{ConcurrentCacheExt, SegmentedCache}; - use std::time::Duration; + use std::{sync::Arc, time::Duration}; #[test] fn basic_single_thread() { @@ -796,6 +874,109 @@ mod tests { Ok(()) } + #[test] + fn test_iter() { + const NUM_KEYS: usize = 50; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + // let cache = SegmentedCache::builder(5) + let cache = SegmentedCache::builder(4) + .max_capacity(100) + .time_to_idle(Duration::from_secs(10)) + .build(); + + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)); + } + + let mut key_set = std::collections::HashSet::new(); + + for (key, value) in &cache { + assert_eq!(value, make_value(*key)); + + key_set.insert(*key); + } + + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + } + + /// Runs 16 threads at the same time and ensures no deadlock occurs. + /// + /// - Eight of the threads will update key-values in the cache. + /// - Eight others will iterate the cache. + /// + #[test] + fn test_iter_multi_threads() { + use std::collections::HashSet; + + const NUM_KEYS: usize = 1024; + const NUM_THREADS: usize = 16; + + fn make_value(key: usize) -> String { + format!("val: {}", key) + } + + let cache = SegmentedCache::builder(4) + .max_capacity(2048) + .time_to_idle(Duration::from_secs(10)) + .build(); + + // Initialize the cache. + for key in 0..NUM_KEYS { + cache.insert(key, make_value(key)); + } + + let rw_lock = Arc::new(std::sync::RwLock::<()>::default()); + let write_lock = rw_lock.write().unwrap(); + + // https://rust-lang.github.io/rust-clippy/master/index.html#needless_collect + #[allow(clippy::needless_collect)] + let handles = (0..NUM_THREADS) + .map(|n| { + let cache = cache.clone(); + let rw_lock = Arc::clone(&rw_lock); + + if n % 2 == 0 { + // This thread will update the cache. + std::thread::spawn(move || { + let read_lock = rw_lock.read().unwrap(); + for key in 0..NUM_KEYS { + // TODO: Update keys in a random order? + cache.insert(key, make_value(key)); + } + std::mem::drop(read_lock); + }) + } else { + // This thread will iterate the cache. + std::thread::spawn(move || { + let read_lock = rw_lock.read().unwrap(); + let mut key_set = HashSet::new(); + for (key, value) in &cache { + assert_eq!(value, make_value(*key)); + key_set.insert(*key); + } + // Ensure there are no missing or duplicate keys in the iteration. + assert_eq!(key_set.len(), NUM_KEYS); + std::mem::drop(read_lock); + }) + } + }) + .collect::>(); + + // Let these threads to run by releasing the write lock. + std::mem::drop(write_lock); + + handles.into_iter().for_each(|h| h.join().expect("Failed")); + + // Ensure there are no missing or duplicate keys in the iteration. + let key_set = cache.iter().map(|(k, _v)| *k).collect::>(); + assert_eq!(key_set.len(), NUM_KEYS); + } + #[test] fn get_with() { use std::thread::{sleep, spawn}; diff --git a/src/unsync.rs b/src/unsync.rs index b4df6338..84e18a0c 100644 --- a/src/unsync.rs +++ b/src/unsync.rs @@ -6,12 +6,14 @@ mod builder; mod cache; mod deques; +mod iter; use std::{ptr::NonNull, rc::Rc}; use tagptr::TagNonNull; pub use builder::CacheBuilder; pub use cache::Cache; +pub use iter::Iter; use crate::common::{deque::DeqNode, time::Instant}; diff --git a/src/unsync/cache.rs b/src/unsync/cache.rs index c1c39efe..e1655ab1 100644 --- a/src/unsync/cache.rs +++ b/src/unsync/cache.rs @@ -1,4 +1,6 @@ -use super::{deques::Deques, AccessTime, CacheBuilder, KeyDate, KeyHashDate, ValueEntry, Weigher}; +use super::{ + deques::Deques, AccessTime, CacheBuilder, Iter, KeyDate, KeyHashDate, ValueEntry, Weigher, +}; use crate::{ common::{ self, @@ -270,8 +272,6 @@ where /// /// The key may be any borrowed form of the cache's key type, but `Hash` and `Eq` /// on the borrowed form _must_ match those for the key type. - /// - /// [rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html pub fn get(&mut self, key: &Q) -> Option<&V> where Rc: Borrow, @@ -303,6 +303,12 @@ where } } + pub(crate) fn is_expired_entry(&self, entry: &ValueEntry) -> bool { + let now = self.current_time_from_expiration_clock(); + Self::is_expired_entry_wo(&self.time_to_live, entry, now) + || Self::is_expired_entry_ao(&self.time_to_idle, entry, now) + } + /// Inserts a key-value pair into the cache. /// /// If the cache has this key present, the value is updated. @@ -393,6 +399,32 @@ where self.saturating_sub_from_total_weight(invalidated); } + /// Creates an iterator visiting all key-value pairs in arbitrary order. The + /// iterator element type is `(&K, &V)`. + /// + /// Unlike the `get` method, visiting entries via an iterator do not update the + /// historic popularity estimator or reset idle timers for keys. + /// + /// # Examples + /// + /// ```rust + /// use moka::unsync::Cache; + /// + /// let mut cache = Cache::new(100); + /// cache.insert("Julia", 14); + /// + /// let mut iter = cache.iter(); + /// let (k, v) = iter.next().unwrap(); // (&K, &V) + /// assert_eq!(k, &"Julia"); + /// assert_eq!(v, &14); + /// + /// assert!(iter.next().is_none()); + /// ``` + /// + pub fn iter(&self) -> Iter<'_, K, V, S> { + Iter::new(self, self.cache.iter()) + } + /// Returns a read-only cache policy of this cache. /// /// At this time, cache policy cannot be modified after cache creation. @@ -1204,6 +1236,7 @@ mod tests { assert_eq!(cache.get(&"a"), None); assert!(!cache.contains_key(&"a")); + assert_eq!(cache.iter().count(), 0); assert!(cache.cache.is_empty()); cache.insert("b", "bob"); @@ -1230,6 +1263,7 @@ mod tests { assert_eq!(cache.get(&"b"), None); assert!(!cache.contains_key(&"a")); assert!(!cache.contains_key(&"b")); + assert_eq!(cache.iter().count(), 0); assert!(cache.cache.is_empty()); } @@ -1270,6 +1304,7 @@ mod tests { assert_eq!(cache.get(&"b"), Some(&"bob")); assert!(!cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); + assert_eq!(cache.iter().count(), 1); assert_eq!(cache.cache.len(), 1); mock.increment(Duration::from_secs(10)); // 25 secs @@ -1278,6 +1313,7 @@ mod tests { assert_eq!(cache.get(&"b"), None); assert!(!cache.contains_key(&"a")); assert!(!cache.contains_key(&"b")); + assert_eq!(cache.iter().count(), 0); assert!(cache.cache.is_empty()); } diff --git a/src/unsync/iter.rs b/src/unsync/iter.rs new file mode 100644 index 00000000..49a2aea4 --- /dev/null +++ b/src/unsync/iter.rs @@ -0,0 +1,36 @@ +use super::{Cache, ValueEntry}; + +use std::{ + hash::{BuildHasher, Hash}, + rc::Rc, +}; + +type HashMapIter<'i, K, V> = std::collections::hash_map::Iter<'i, Rc, ValueEntry>; + +pub struct Iter<'i, K, V, S> { + cache: &'i Cache, + iter: HashMapIter<'i, K, V>, +} + +impl<'i, K, V, S> Iter<'i, K, V, S> { + pub(crate) fn new(cache: &'i Cache, iter: HashMapIter<'i, K, V>) -> Self { + Self { cache, iter } + } +} + +impl<'i, K, V, S> Iterator for Iter<'i, K, V, S> +where + K: Hash + Eq, + S: BuildHasher + Clone, +{ + type Item = (&'i K, &'i V); + + fn next(&mut self) -> Option { + for (k, entry) in self.iter.by_ref() { + if !self.cache.is_expired_entry(entry) { + return Some((k, &entry.value)); + } + } + None + } +}