Skip to content

Commit

Permalink
Merge pull request #169 from moka-rs/drop-evicted-entries-immediately
Browse files Browse the repository at this point in the history
Drop evicted entries immediately
  • Loading branch information
tatsuya6502 authored Jul 24, 2022
2 parents b17a45b + bb2656a commit f3b5d7e
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 7 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Moka Cache — Change Log

## Version 0.9.3

### Fixed

- Ensure that the following caches will drop the value of evicted entries immediately
after eviction ([#169][gh-pull-0169]):
- `sync::Cache`
- `sync::SegmentedCache`
- `future::Cache`


## Version 0.9.2

### Fixed
Expand Down Expand Up @@ -450,6 +461,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0167]: https://github.com/moka-rs/moka/pull/169/
[gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/
[gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/
[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "moka"
version = "0.9.2"
version = "0.9.3"
edition = "2018"
rust-version = "1.51"

description = "A fast and concurrent cache library inspired by Caffeine (Java)"
description = "A fast and concurrent cache library inspired by Java Caffeine"
license = "MIT OR Apache-2.0"
# homepage = "https://"
documentation = "https://docs.rs/moka/"
Expand Down
2 changes: 2 additions & 0 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
}
}
}

guard.flush();
std::mem::drop(lock);

Some(next_array)
Expand Down
3 changes: 3 additions & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub(crate) mod deque;
pub(crate) mod frequency_sketch;
pub(crate) mod time;

#[cfg(all(test, any(feature = "sync", feature = "future")))]
pub(crate) mod test_utils;

// Note: `CacheRegion` cannot have more than four enum variants. This is because
// `crate::{sync,unsync}::DeqNodes` uses a `tagptr::TagNonNull<DeqNode<T>, 2>`
// pointer, where the 2-bit tag is `CacheRegion`.
Expand Down
81 changes: 81 additions & 0 deletions src/common/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};

#[derive(Debug, Default)]
pub(crate) struct Counters {
inserted: AtomicU32,
evicted: AtomicU32,
invalidated: AtomicU32,
value_created: AtomicU32,
value_dropped: AtomicU32,
}

impl Counters {
pub(crate) fn inserted(&self) -> u32 {
self.inserted.load(Ordering::Acquire)
}

pub(crate) fn evicted(&self) -> u32 {
self.evicted.load(Ordering::Acquire)
}

pub(crate) fn invalidated(&self) -> u32 {
self.invalidated.load(Ordering::Acquire)
}

pub(crate) fn value_created(&self) -> u32 {
self.value_created.load(Ordering::Acquire)
}

pub(crate) fn value_dropped(&self) -> u32 {
self.value_dropped.load(Ordering::Acquire)
}

pub(crate) fn incl_inserted(&self) {
self.inserted.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_evicted(&self) {
self.evicted.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_invalidated(&self) {
self.invalidated.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_value_created(&self) {
self.value_created.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_value_dropped(&self) {
self.value_dropped.fetch_add(1, Ordering::AcqRel);
}
}

#[derive(Debug)]
pub(crate) struct Value {
// blob: Vec<u8>,
counters: Arc<Counters>,
}

impl Value {
pub(crate) fn new(_blob: Vec<u8>, counters: &Arc<Counters>) -> Self {
counters.incl_value_created();
Self {
// blob,
counters: Arc::clone(counters),
}
}

// pub(crate) fn blob(&self) -> &[u8] {
// &self.blob
// }
}

impl Drop for Value {
fn drop(&mut self) {
self.counters.incl_value_dropped();
}
}
2 changes: 1 addition & 1 deletion src/dash/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ where
time_to_idle: Option<Duration>,
) -> Self {
let initial_capacity = initial_capacity
.map(|cap| cap + WRITE_LOG_SIZE * 4)
.map(|cap| cap + WRITE_LOG_SIZE)
.unwrap_or_default();
let cache =
dashmap::DashMap::with_capacity_and_hasher(initial_capacity, build_hasher.clone());
Expand Down
107 changes: 106 additions & 1 deletion src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ where
Self::schedule_write_op(&self.base.write_op_ch, op, hk)
.await
.expect("Failed to remove");
crossbeam_epoch::pin().flush();
}
}

Expand Down Expand Up @@ -1201,6 +1202,7 @@ where
.await;
self.value_initializer
.remove_waiter(&key, TypeId::of::<()>());
crossbeam_epoch::pin().flush();
v
}
InitResult::ReadExisting(v) => v,
Expand Down Expand Up @@ -1233,10 +1235,14 @@ where
.await;
self.value_initializer
.remove_waiter(&key, TypeId::of::<E>());
crossbeam_epoch::pin().flush();
Ok(v)
}
InitResult::ReadExisting(v) => Ok(v),
InitResult::InitErr(e) => Err(e),
InitResult::InitErr(e) => {
crossbeam_epoch::pin().flush();
Err(e)
}
}
}

Expand Down Expand Up @@ -2762,6 +2768,105 @@ mod tests {
cache.invalidate(key_s).await;
}

#[tokio::test]
async fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};

const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;

let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);

let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};

let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();

// Make the cache exterior immutable.
let cache = cache;

for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value).await;
counters.incl_inserted();
cache.sync();
}

let eviction_count = KEYS - MAX_CAPACITY;

// Retries will be needed when testing in a QEMU VM.
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));

if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
assert_eq!(
counters.value_dropped(),
eviction_count,
"Retries exhausted"
);
}
}

assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");

break;
}

for key in 0..KEYS {
cache.invalidate(&key).await;
cache.sync();
}

let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));

if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
}
}

assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");

break;
}

std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}

#[tokio::test]
async fn test_debug_format() {
let cache = Cache::new(10);
Expand Down
3 changes: 3 additions & 0 deletions src/notification/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ impl<K, V> NotificationTask<K, V> {

// Safety: It is safe to assert unwind safety here because we will not
// call the listener again if it has been panicked.
//
#[allow(clippy::let_and_return)]
// https://rust-lang.github.io/rust-clippy/master/index.html#let_and_return
let result = catch_unwind(AssertUnwindSafe(listener_clo));
#[cfg(feature = "logging")]
{
Expand Down
Loading

0 comments on commit f3b5d7e

Please sign in to comment.