Skip to content

Commit

Permalink
Fix a bug causing memory leak in future::Cache when get_with and
Browse files Browse the repository at this point in the history
friends methods are used

- Use the correct hash value for an internal waiters map in `future::Cache`.
- Also rename variable names for hash values in `sync::Cache` to prevent similar
  bugs.
  • Loading branch information
tatsuya6502 committed Oct 2, 2023
1 parent 0b160cd commit fef6a77
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 37 deletions.
61 changes: 38 additions & 23 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ where
S: BuildHasher,
{
is_waiter_value_set: bool,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiters: TrioArc<WaiterMap<K, V, S>>,
write_lock: RwLockWriteGuard<'a, WaiterValue<V>>,
}
Expand All @@ -71,15 +71,15 @@ where
S: BuildHasher,
{
fn new(
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiters: TrioArc<WaiterMap<K, V, S>>,
write_lock: RwLockWriteGuard<'a, WaiterValue<V>>,
) -> Self {
Self {
is_waiter_value_set: false,
cht_key,
hash,
w_key,
w_hash,
waiters,
write_lock,
}
Expand All @@ -103,7 +103,7 @@ where
// has been aborted. Remove our waiter to prevent the issue described in
// https://github.com/moka-rs/moka/issues/59
*self.write_lock = WaiterValue::EnclosingFutureAborted;
remove_waiter(&self.waiters, self.cht_key.clone(), self.hash);
remove_waiter(&self.waiters, self.w_key.clone(), self.w_hash);
self.is_waiter_value_set = true;
}
}
Expand Down Expand Up @@ -147,8 +147,8 @@ where
#[allow(clippy::too_many_arguments)]
pub(crate) async fn try_init_or_read<'a, C, I, O, E>(
&'a self,
key: &Arc<K>,
hash: u64,
c_key: &Arc<K>,
c_hash: u64,
type_id: TypeId,
cache: &C,
ignore_if: Arc<Mutex<Option<I>>>,
Expand All @@ -169,35 +169,35 @@ where
const MAX_RETRIES: usize = 200;
let mut retries = 0;

let cht_key = (Arc::clone(key), type_id);
let (w_key, w_hash) = waiter_key_hash(&self.waiters, c_key, type_id);

loop {
let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing));
let lock = waiter.write().await;

match try_insert_waiter(&self.waiters, cht_key.clone(), hash, &waiter) {
match try_insert_waiter(&self.waiters, w_key.clone(), w_hash, &waiter) {
None => {
// Our waiter was inserted.

// Create a guard. This will ensure to remove our waiter when the
// enclosing future has been aborted:
// https://github.com/moka-rs/moka/issues/59
let mut waiter_guard = WaiterGuard::new(
cht_key.clone(),
hash,
w_key.clone(),
w_hash,
TrioArc::clone(&self.waiters),
lock,
);

// Check if the value has already been inserted by other thread.
if let Some(value) = cache
.get_without_recording(key, hash, ignore_if.lock().await.as_mut())
.get_without_recording(c_key, c_hash, ignore_if.lock().await.as_mut())
.await
{
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
waiter_guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone())));
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
return InitResult::ReadExisting(value);
}

Expand All @@ -209,7 +209,7 @@ where
Ok(value) => {
let (waiter_val, init_res) = match post_init(value) {
Ok(value) => {
cache.insert(Arc::clone(key), hash, value.clone()).await;
cache.insert(Arc::clone(c_key), c_hash, value.clone()).await;
(
WaiterValue::Ready(Ok(value.clone())),
InitResult::Initialized(value),
Expand All @@ -224,14 +224,14 @@ where
}
};
waiter_guard.set_waiter_value(waiter_val);
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
return init_res;
}
// Panicked.
Err(payload) => {
waiter_guard.set_waiter_value(WaiterValue::InitFuturePanicked);
// Remove the waiter so that others can retry.
remove_waiter(&self.waiters, cht_key, hash);
remove_waiter(&self.waiters, w_key, w_hash);
resume_unwind(payload);
}
} // The lock will be unlocked here.
Expand Down Expand Up @@ -311,27 +311,42 @@ where
}

#[inline]
fn remove_waiter<K, V, S>(waiter_map: &WaiterMap<K, V, S>, cht_key: (Arc<K>, TypeId), hash: u64)
fn remove_waiter<K, V, S>(waiter_map: &WaiterMap<K, V, S>, w_key: (Arc<K>, TypeId), w_hash: u64)
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
waiter_map.remove(hash, |k| k == &cht_key);
waiter_map.remove(w_hash, |k| k == &w_key);
}

#[inline]
fn try_insert_waiter<K, V, S>(
waiter_map: &WaiterMap<K, V, S>,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiter: &Waiter<V>,
) -> Option<Waiter<V>>
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
let waiter = TrioArc::clone(waiter);
waiter_map.insert_if_not_present(cht_key, hash, waiter)
waiter_map.insert_if_not_present(w_key, w_hash, waiter)
}

#[inline]
fn waiter_key_hash<K, V, S>(
waiter_map: &WaiterMap<K, V, S>,
c_key: &Arc<K>,
type_id: TypeId,
) -> ((Arc<K>, TypeId), u64)
where
(Arc<K>, TypeId): Eq + Hash,
S: BuildHasher,
{
let w_key = (Arc::clone(c_key), type_id);
let w_hash = waiter_map.hash(&w_key);
(w_key, w_hash)
}

fn panic_if_retry_exhausted_for_panicking(retries: usize, max: usize) {
Expand Down
28 changes: 14 additions & 14 deletions src/sync/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,21 +68,21 @@ where
const MAX_RETRIES: usize = 200;
let mut retries = 0;

let (cht_key, hash) = self.cht_key_hash(key, type_id);
let (w_key, w_hash) = self.waiter_key_hash(key, type_id);

loop {
let waiter = TrioArc::new(RwLock::new(None));
let mut lock = waiter.write();

match self.try_insert_waiter(cht_key.clone(), hash, &waiter) {
match self.try_insert_waiter(w_key.clone(), w_hash, &waiter) {
None => {
// Our waiter was inserted.
// Check if the value has already been inserted by other thread.
if let Some(value) = get() {
// Yes. Set the waiter value, remove our waiter, and return
// the existing value.
*lock = Some(Ok(value.clone()));
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
return InitResult::ReadExisting(value);
}

Expand All @@ -106,14 +106,14 @@ where
}
};
*lock = waiter_val;
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
return init_res;
}
// Panicked.
Err(payload) => {
*lock = None;
// Remove the waiter so that others can retry.
self.remove_waiter(cht_key, hash);
self.remove_waiter(w_key, w_hash);
resume_unwind(payload);
}
} // The write lock will be unlocked here.
Expand Down Expand Up @@ -184,25 +184,25 @@ where
}

#[inline]
fn remove_waiter(&self, cht_key: (Arc<K>, TypeId), hash: u64) {
self.waiters.remove(hash, |k| k == &cht_key);
fn remove_waiter(&self, w_key: (Arc<K>, TypeId), w_hash: u64) {
self.waiters.remove(w_hash, |k| k == &w_key);
}

#[inline]
fn try_insert_waiter(
&self,
cht_key: (Arc<K>, TypeId),
hash: u64,
w_key: (Arc<K>, TypeId),
w_hash: u64,
waiter: &Waiter<V>,
) -> Option<Waiter<V>> {
let waiter = TrioArc::clone(waiter);
self.waiters.insert_if_not_present(cht_key, hash, waiter)
self.waiters.insert_if_not_present(w_key, w_hash, waiter)
}

#[inline]
fn cht_key_hash(&self, key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let cht_key = (Arc::clone(key), type_id);
let hash = self.waiters.hash(&cht_key);
(cht_key, hash)
fn waiter_key_hash(&self, c_key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let w_key = (Arc::clone(c_key), type_id);
let w_hash = self.waiters.hash(&w_key);
(w_key, w_hash)
}
}

0 comments on commit fef6a77

Please sign in to comment.