Skip to content

Commit

Permalink
Merge pull request #23 from moka-rs/get-or-try-insert-return-type
Browse files Browse the repository at this point in the history
Change get_or_try_insert_with to return a concrete error type rather than a trait object
  • Loading branch information
tatsuya6502 authored Aug 9, 2021
2 parents a737009 + 226edad commit 76b06af
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 129 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Moka — Change Log

## Version 0.6.0 (Unreleased)

### Changed

- Change `get_or_try_insert_with` to return a concrete error type rather
than a trait object. ([#23][gh-pull-0023])


## Version 0.5.1

### Changed
Expand Down Expand Up @@ -81,6 +89,7 @@

[caffeine-git]: https://github.com/ben-manes/caffeine

[gh-pull-0023]: https://github.com/moka-rs/moka/pull/23/
[gh-pull-0022]: https://github.com/moka-rs/moka/pull/22/
[gh-pull-0020]: https://github.com/moka-rs/moka/pull/20/
[gh-pull-0019]: https://github.com/moka-rs/moka/pull/19/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.5.1"
version = "0.6.0"
authors = ["Tatsuya Kawano <tatsuya@hibaridb.org>"]
edition = "2018"

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
moka = "0.5"
moka = "0.6"
```

To use the asynchronous cache, enable a crate feature called "future".

```toml
[dependencies]
moka = { version = "0.5", features = ["future"] }
moka = { version = "0.6", features = ["future"] }
```


Expand Down Expand Up @@ -164,7 +164,7 @@ Here is a similar program to the previous example, but using asynchronous cache
// Cargo.toml
//
// [dependencies]
// moka = { version = "0.5", features = ["future"] }
// moka = { version = "0.6", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures = "0.3"
Expand Down
55 changes: 28 additions & 27 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{

use crossbeam_channel::{Sender, TrySendError};
use std::{
any::TypeId,
borrow::Borrow,
collections::hash_map::RandomState,
error::Error,
Expand Down Expand Up @@ -57,7 +58,7 @@ use std::{
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.5", features = ["future"] }
/// // moka = { version = "0.6", features = ["future"] }
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// // futures = "0.3"
///
Expand Down Expand Up @@ -290,19 +291,12 @@ where
///
/// This method prevents to resolve the init future multiple times on the same
/// key even if the method is concurrently called by many async tasks; only one
/// of the calls resolves its future, and other calls wait for that future to
/// complete.
#[allow(clippy::redundant_allocation)]
// https://rust-lang.github.io/rust-clippy/master/index.html#redundant_allocation
// `Arc<Box<dyn ..>>` in the return type creates an extra heap allocation.
// This will be addressed by Moka v0.6.0.
pub async fn get_or_try_insert_with<F>(
&self,
key: K,
init: F,
) -> Result<V, Arc<Box<dyn Error + Send + Sync + 'static>>>
/// of the calls resolves its future (as long as these futures return the same
/// error type), and other calls wait for that future to complete.
pub async fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, Box<dyn Error + Send + Sync + 'static>>>,
F: Future<Output = Result<V, E>>,
E: Error + Send + Sync + 'static,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
Expand Down Expand Up @@ -478,26 +472,24 @@ where
InitResult::Initialized(v) => {
self.insert_with_hash(Arc::clone(&key), hash, v.clone())
.await;
self.value_initializer.remove_waiter(&key);
self.value_initializer
.remove_waiter(&key, TypeId::of::<()>());
v
}
InitResult::ReadExisting(v) => v,
InitResult::InitErr(_) => unreachable!(),
}
}

#[allow(clippy::redundant_allocation)]
// https://rust-lang.github.io/rust-clippy/master/index.html#redundant_allocation
// `Arc<Box<dyn ..>>` in the return type creates an extra heap allocation.
// This will be addressed by Moka v0.6.0.
async fn get_or_try_insert_with_hash_and_fun<F>(
async fn get_or_try_insert_with_hash_and_fun<F, E>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Result<V, Arc<Box<dyn Error + Send + Sync + 'static>>>
) -> Result<V, Arc<E>>
where
F: Future<Output = Result<V, Box<dyn Error + Send + Sync + 'static>>>,
F: Future<Output = Result<V, E>>,
E: Error + Send + Sync + 'static,
{
if let Some(v) = self.base.get_with_hash(&key, hash) {
return Ok(v);
Expand All @@ -512,7 +504,8 @@ where
let hash = self.base.hash(&key);
self.insert_with_hash(Arc::clone(&key), hash, v.clone())
.await;
self.value_initializer.remove_waiter(&key);
self.value_initializer
.remove_waiter(&key, TypeId::of::<E>());
Ok(v)
}
InitResult::ReadExisting(v) => Ok(v),
Expand Down Expand Up @@ -1028,6 +1021,14 @@ mod tests {

#[tokio::test]
async fn get_or_try_insert_with() {
use std::sync::Arc;

#[derive(thiserror::Error, Debug)]
#[error("{}", _0)]
pub struct MyError(String);

type MyResult<T> = Result<T, Arc<MyError>>;

let cache = Cache::new(100);
const KEY: u32 = 0;

Expand All @@ -1044,7 +1045,7 @@ mod tests {
.get_or_try_insert_with(KEY, async {
// Wait for 300 ms and return an error.
Timer::after(Duration::from_millis(300)).await;
Err("task1 error".into())
Err(MyError("task1 error".into()))
})
.await;
assert!(v.is_err());
Expand All @@ -1060,7 +1061,7 @@ mod tests {
async move {
// Wait for 100 ms before calling `get_or_try_insert_with`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2
let v: MyResult<_> = cache2
.get_or_try_insert_with(KEY, async { unreachable!() })
.await;
assert!(v.is_err());
Expand All @@ -1077,7 +1078,7 @@ mod tests {
async move {
// Wait for 400 ms before calling `get_or_try_insert_with`.
Timer::after(Duration::from_millis(400)).await;
let v = cache3
let v: MyResult<_> = cache3
.get_or_try_insert_with(KEY, async {
// Wait for 300 ms and return an Ok(&str) value.
Timer::after(Duration::from_millis(300)).await;
Expand All @@ -1096,7 +1097,7 @@ mod tests {
async move {
// Wait for 500 ms before calling `get_or_try_insert_with`.
Timer::after(Duration::from_millis(500)).await;
let v = cache4
let v: MyResult<_> = cache4
.get_or_try_insert_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
Expand All @@ -1113,7 +1114,7 @@ mod tests {
async move {
// Wait for 800 ms before calling `get_or_try_insert_with`.
Timer::after(Duration::from_millis(800)).await;
let v = cache5
let v: MyResult<_> = cache5
.get_or_try_insert_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
Expand Down
53 changes: 32 additions & 21 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
use async_lock::RwLock;
use std::{
any::{Any, TypeId},
error::Error,
future::Future,
hash::{BuildHasher, Hash},
sync::Arc,
};

type Waiter<V> = Arc<RwLock<Option<Result<V, Arc<Box<dyn Error + Send + Sync + 'static>>>>>>;
type ErrorObject = Arc<dyn Any + Send + Sync + 'static>;
type Waiter<V> = Arc<RwLock<Option<Result<V, ErrorObject>>>>;

#[allow(clippy::redundant_allocation)]
// https://rust-lang.github.io/rust-clippy/master/index.html#redundant_allocation
pub(crate) enum InitResult<V> {
pub(crate) enum InitResult<V, E> {
Initialized(V),
ReadExisting(V),
// This `Arc<Box<dyn ..>>` creates an extra heap allocation. This will be
// addressed by Moka v0.6.0.
InitErr(Arc<Box<dyn Error + Send + Sync + 'static>>),
InitErr(Arc<E>),
}

pub(crate) struct ValueInitializer<K, V, S> {
waiters: moka_cht::SegmentedHashMap<Arc<K>, Waiter<V>, S>,
// TypeId is the type ID of the concrete error type of generic type E in
// try_init_or_read(). We use the type ID as a part of the key to ensure that
// we can always downcast the trait object ErrorObject (in Waiter<V>) into
// its concrete type.
waiters: moka_cht::SegmentedHashMap<(Arc<K>, TypeId), Waiter<V>, S>,
}

impl<K, V, S> ValueInitializer<K, V, S>
Expand All @@ -34,16 +36,17 @@ where
}
}

pub(crate) async fn init_or_read<F>(&self, key: Arc<K>, init: F) -> InitResult<V>
pub(crate) async fn init_or_read<F>(&self, key: Arc<K>, init: F) -> InitResult<V, ()>
where
F: Future<Output = V>,
{
use InitResult::*;

let type_id = TypeId::of::<()>();
let waiter = Arc::new(RwLock::new(None));
let mut lock = waiter.write().await;

match self.try_insert_waiter(&key, &waiter) {
match self.try_insert_waiter(&key, type_id, &waiter) {
None => {
// Inserted. Resolve the init future.
let value = init.await;
Expand All @@ -62,16 +65,18 @@ where
}
}

pub(crate) async fn try_init_or_read<F>(&self, key: Arc<K>, init: F) -> InitResult<V>
pub(crate) async fn try_init_or_read<F, E>(&self, key: Arc<K>, init: F) -> InitResult<V, E>
where
F: Future<Output = Result<V, Box<dyn Error + Send + Sync + 'static>>>,
F: Future<Output = Result<V, E>>,
E: Error + Send + Sync + 'static,
{
use InitResult::*;

let type_id = TypeId::of::<E>();
let waiter = Arc::new(RwLock::new(None));
let mut lock = waiter.write().await;

match self.try_insert_waiter(&key, &waiter) {
match self.try_insert_waiter(&key, type_id, &waiter) {
None => {
// Inserted. Resolve the init future.
match init.await {
Expand All @@ -80,10 +85,10 @@ where
Initialized(value)
}
Err(e) => {
let err = Arc::new(e);
let err: ErrorObject = Arc::new(e);
*lock = Some(Err(Arc::clone(&err)));
self.remove_waiter(&key);
InitErr(err)
self.remove_waiter(&key, type_id);
InitErr(err.downcast().unwrap())
}
}
}
Expand All @@ -93,23 +98,29 @@ where
std::mem::drop(lock);
match &*res.read().await {
Some(Ok(value)) => ReadExisting(value.clone()),
Some(Err(e)) => InitErr(Arc::clone(e)),
Some(Err(e)) => InitErr(Arc::clone(e).downcast().unwrap()),
None => unreachable!(),
}
}
}
}

#[inline]
pub(crate) fn remove_waiter(&self, key: &Arc<K>) {
self.waiters.remove(key);
pub(crate) fn remove_waiter(&self, key: &Arc<K>, type_id: TypeId) {
let key = Arc::clone(key);
self.waiters.remove(&(key, type_id));
}

fn try_insert_waiter(&self, key: &Arc<K>, waiter: &Waiter<V>) -> Option<Waiter<V>> {
fn try_insert_waiter(
&self,
key: &Arc<K>,
type_id: TypeId,
waiter: &Waiter<V>,
) -> Option<Waiter<V>> {
let key = Arc::clone(key);
let waiter = Arc::clone(waiter);

self.waiters
.insert_with_or_modify(key, || waiter, |_, w| Arc::clone(w))
.insert_with_or_modify((key, type_id), || waiter, |_, w| Arc::clone(w))
}
}
Loading

0 comments on commit 76b06af

Please sign in to comment.