Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop evicted entries immediately #169

Merged
merged 14 commits into from
Jul 24, 2022
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Moka Cache — Change Log

## Version 0.9.3

### Fixed

- Ensure that the following caches will drop the value of evicted entries immediately
after eviction ([#169][gh-pull-0169]):
- `sync::Cache`
- `sync::SegmentedCache`
- `future::Cache`


## Version 0.9.2

### Fixed
Expand Down Expand Up @@ -450,6 +461,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0167]: https://github.com/moka-rs/moka/pull/169/
[gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/
[gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/
[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "moka"
version = "0.9.2"
version = "0.9.3"
edition = "2018"
rust-version = "1.51"

description = "A fast and concurrent cache library inspired by Caffeine (Java)"
description = "A fast and concurrent cache library inspired by Java Caffeine"
license = "MIT OR Apache-2.0"
# homepage = "https://"
documentation = "https://docs.rs/moka/"
Expand Down
2 changes: 2 additions & 0 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
}
}
}

guard.flush();
std::mem::drop(lock);

Some(next_array)
Expand Down
3 changes: 3 additions & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub(crate) mod deque;
pub(crate) mod frequency_sketch;
pub(crate) mod time;

#[cfg(all(test, any(feature = "sync", feature = "future")))]
pub(crate) mod test_utils;

// Note: `CacheRegion` cannot have more than four enum variants. This is because
// `crate::{sync,unsync}::DeqNodes` uses a `tagptr::TagNonNull<DeqNode<T>, 2>`
// pointer, where the 2-bit tag is `CacheRegion`.
Expand Down
81 changes: 81 additions & 0 deletions src/common/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};

#[derive(Debug, Default)]
pub(crate) struct Counters {
inserted: AtomicU32,
evicted: AtomicU32,
invalidated: AtomicU32,
value_created: AtomicU32,
value_dropped: AtomicU32,
}

impl Counters {
pub(crate) fn inserted(&self) -> u32 {
self.inserted.load(Ordering::Acquire)
}

pub(crate) fn evicted(&self) -> u32 {
self.evicted.load(Ordering::Acquire)
}

pub(crate) fn invalidated(&self) -> u32 {
self.invalidated.load(Ordering::Acquire)
}

pub(crate) fn value_created(&self) -> u32 {
self.value_created.load(Ordering::Acquire)
}

pub(crate) fn value_dropped(&self) -> u32 {
self.value_dropped.load(Ordering::Acquire)
}

pub(crate) fn incl_inserted(&self) {
self.inserted.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_evicted(&self) {
self.evicted.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_invalidated(&self) {
self.invalidated.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_value_created(&self) {
self.value_created.fetch_add(1, Ordering::AcqRel);
}

pub(crate) fn incl_value_dropped(&self) {
self.value_dropped.fetch_add(1, Ordering::AcqRel);
}
}

#[derive(Debug)]
pub(crate) struct Value {
// blob: Vec<u8>,
counters: Arc<Counters>,
}

impl Value {
pub(crate) fn new(_blob: Vec<u8>, counters: &Arc<Counters>) -> Self {
counters.incl_value_created();
Self {
// blob,
counters: Arc::clone(counters),
}
}

// pub(crate) fn blob(&self) -> &[u8] {
// &self.blob
// }
}

impl Drop for Value {
fn drop(&mut self) {
self.counters.incl_value_dropped();
}
}
2 changes: 1 addition & 1 deletion src/dash/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ where
time_to_idle: Option<Duration>,
) -> Self {
let initial_capacity = initial_capacity
.map(|cap| cap + WRITE_LOG_SIZE * 4)
.map(|cap| cap + WRITE_LOG_SIZE)
.unwrap_or_default();
let cache =
dashmap::DashMap::with_capacity_and_hasher(initial_capacity, build_hasher.clone());
Expand Down
107 changes: 106 additions & 1 deletion src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ where
Self::schedule_write_op(&self.base.write_op_ch, op, hk)
.await
.expect("Failed to remove");
crossbeam_epoch::pin().flush();
}
}

Expand Down Expand Up @@ -1201,6 +1202,7 @@ where
.await;
self.value_initializer
.remove_waiter(&key, TypeId::of::<()>());
crossbeam_epoch::pin().flush();
v
}
InitResult::ReadExisting(v) => v,
Expand Down Expand Up @@ -1233,10 +1235,14 @@ where
.await;
self.value_initializer
.remove_waiter(&key, TypeId::of::<E>());
crossbeam_epoch::pin().flush();
Ok(v)
}
InitResult::ReadExisting(v) => Ok(v),
InitResult::InitErr(e) => Err(e),
InitResult::InitErr(e) => {
crossbeam_epoch::pin().flush();
Err(e)
}
}
}

Expand Down Expand Up @@ -2762,6 +2768,105 @@ mod tests {
cache.invalidate(key_s).await;
}

#[tokio::test]
async fn drop_value_immediately_after_eviction() {
use crate::common::test_utils::{Counters, Value};

const MAX_CAPACITY: u32 = 500;
const KEYS: u32 = ((MAX_CAPACITY as f64) * 1.2) as u32;

let counters = Arc::new(Counters::default());
let counters1 = Arc::clone(&counters);

let listener = move |_k, _v, cause| match cause {
RemovalCause::Size => counters1.incl_evicted(),
RemovalCause::Explicit => counters1.incl_invalidated(),
_ => (),
};

let mut cache = Cache::builder()
.max_capacity(MAX_CAPACITY as u64)
.eviction_listener_with_queued_delivery_mode(listener)
.build();
cache.reconfigure_for_testing();

// Make the cache exterior immutable.
let cache = cache;

for key in 0..KEYS {
let value = Arc::new(Value::new(vec![0u8; 1024], &counters));
cache.insert(key, value).await;
counters.incl_inserted();
cache.sync();
}

let eviction_count = KEYS - MAX_CAPACITY;

// Retries will be needed when testing in a QEMU VM.
const MAX_RETRIES: usize = 5;
let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));

if counters.evicted() != eviction_count || counters.value_dropped() != eviction_count {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(counters.evicted(), eviction_count, "Retries exhausted");
assert_eq!(
counters.value_dropped(),
eviction_count,
"Retries exhausted"
);
}
}

assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), 0, "invalidated");
assert_eq!(counters.value_dropped(), eviction_count, "value_dropped");

break;
}

for key in 0..KEYS {
cache.invalidate(&key).await;
cache.sync();
}

let mut retries = 0;
loop {
// Ensure all scheduled notifications have been processed.
std::thread::sleep(Duration::from_millis(500));

if counters.invalidated() != MAX_CAPACITY || counters.value_dropped() != KEYS {
if retries <= MAX_RETRIES {
retries += 1;
cache.sync();
continue;
} else {
assert_eq!(counters.invalidated(), MAX_CAPACITY, "Retries exhausted");
assert_eq!(counters.value_dropped(), KEYS, "Retries exhausted");
}
}

assert_eq!(counters.inserted(), KEYS, "inserted");
assert_eq!(counters.value_created(), KEYS, "value_created");
assert_eq!(counters.evicted(), eviction_count, "evicted");
assert_eq!(counters.invalidated(), MAX_CAPACITY, "invalidated");
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");

break;
}

std::mem::drop(cache);
assert_eq!(counters.value_dropped(), KEYS, "value_dropped");
}

#[tokio::test]
async fn test_debug_format() {
let cache = Cache::new(10);
Expand Down
3 changes: 3 additions & 0 deletions src/notification/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ impl<K, V> NotificationTask<K, V> {

// Safety: It is safe to assert unwind safety here because we will not
// call the listener again if it has been panicked.
//
#[allow(clippy::let_and_return)]
// https://rust-lang.github.io/rust-clippy/master/index.html#let_and_return
let result = catch_unwind(AssertUnwindSafe(listener_clo));
#[cfg(feature = "logging")]
{
Expand Down
Loading