Skip to content

Commit

Permalink
Merge pull request #30 from moka-rs/get-or-insert-example
Browse files Browse the repository at this point in the history
Add examples to the doc for `get_or_insert_with` and `get_or_try_insert_with` methods
  • Loading branch information
tatsuya6502 authored Sep 4, 2021
2 parents 1bf28ed + 012ade9 commit 6be9ce8
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 13 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"nocapture",
"peekable",
"preds",
"reqwest",
"runtimes",
"rustdoc",
"rustfmt",
Expand Down
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

### Fixed

- `usize` overflow on big cache capacity. ([#28][gh-pull-0028])
- Fix `usize` overflow on big cache capacity. ([#28][gh-pull-0028])

### Added

- Add examples for `get_or_insert_with` and `get_or_try_insert_with`
methods to the docs. ([#30][gh-pull-0030])


## Version 0.5.1
Expand Down Expand Up @@ -88,6 +93,7 @@

[caffeine-git]: https://github.com/ben-manes/caffeine

[gh-pull-0030]: https://github.com/moka-rs/moka/pull/30/
[gh-pull-0028]: https://github.com/moka-rs/moka/pull/28/
[gh-pull-0022]: https://github.com/moka-rs/moka/pull/22/
[gh-pull-0020]: https://github.com/moka-rs/moka/pull/20/
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ actix-rt1 = { package = "actix-rt", version = "1", default-features = false }
async-std = { version = "1", default-features = false, features = ["attributes"] }
futures = "0.3"
getrandom = "0.2"
reqwest = "0.11"
skeptic = "0.13"
tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }

Expand Down
143 changes: 143 additions & 0 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,72 @@ where
/// key even if the method is concurrently called by many async tasks; only one
/// of the calls resolves its future, and other calls wait for that future to
/// complete.
///
/// # Example
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.5", features = ["future"] }
/// // futures = "0.3"
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
/// let cache = Cache::new(100);
///
/// // Spawn four async tasks.
/// let tasks: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// tokio::spawn(async move {
/// println!("Task {} started.", task_id);
///
/// // Insert and get the value for key1. Although all four async tasks
/// // will call `get_or_insert_with` at the same time, the `init` async
/// // block must be resolved only once.
/// let value = my_cache
/// .get_or_insert_with("key1", async {
/// println!("Task {} inserting a value.", task_id);
/// Arc::new(vec![0u8; TEN_MIB])
/// })
/// .await;
///
/// // Ensure the value exists now.
/// assert_eq!(value.len(), TEN_MIB);
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!("Task {} got the value. (len: {})", task_id, value.len());
/// })
/// })
/// .collect();
///
/// // Run all tasks concurrently and wait for them to complete.
/// futures::future::join_all(tasks).await;
/// }
/// ```
///
/// **A Sample Result**
///
/// - The async black resolved exactly once by task 3.
/// - Other tasks were blocked until task 3 inserted the value.
///
/// ```console
/// Task 0 started.
/// Task 3 started.
/// Task 1 started.
/// Task 2 started.
/// Task 3 inserting a value.
/// Task 3 got the value. (len: 10485760)
/// Task 0 got the value. (len: 10485760)
/// Task 1 got the value. (len: 10485760)
/// Task 2 got the value. (len: 10485760)
/// ```
///
pub async fn get_or_insert_with(&self, key: K, init: impl Future<Output = V>) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
Expand All @@ -292,6 +358,83 @@ where
/// key even if the method is concurrently called by many async tasks; only one
/// of the calls resolves its future, and other calls wait for that future to
/// complete.
///
/// # Example
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.5", features = ["future"] }
/// // futures = "0.3"
/// // reqwest = "0.11"
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
///
/// // This async function tries to get HTML from the given URI.
/// async fn get_html(
/// task_id: u8,
/// uri: &str,
/// ) -> Result<String, Box<dyn std::error::Error + Send + Sync + 'static>> {
/// println!("get_html() called by task {}.", task_id);
/// Ok(reqwest::get(uri).await?.text().await?)
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::new(100);
///
/// // Spawn four async tasks.
/// let tasks: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// tokio::spawn(async move {
/// println!("Task {} started.", task_id);
///
/// // Try to insert and get the value for key1. Although
/// // all four async tasks will call `get_or_try_insert_with`
/// // at the same time, get_html() must be called only once.
/// let value = my_cache
/// .get_or_try_insert_with(
/// "key1",
/// get_html(task_id, "https://www.rust-lang.org"),
/// ).await;
///
/// // Ensure the value exists now.
/// assert!(value.is_ok());
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!(
/// "Task {} got the value. (len: {})",
/// task_id,
/// value.unwrap().len()
/// );
/// })
/// })
/// .collect();
///
/// // Run all tasks concurrently and wait for them to complete.
/// futures::future::join_all(tasks).await;
/// }
/// ```
///
/// **A Sample Result**
///
/// - `get_html()` called exactly once by task 2.
/// - Other tasks were blocked until task 2 inserted the value.
///
/// ```console
/// Task 1 started.
/// Task 0 started.
/// Task 2 started.
/// Task 3 started.
/// get_html() called by task 2.
/// Task 2 got the value. (len: 19419)
/// Task 1 got the value. (len: 19419)
/// Task 0 got the value. (len: 19419)
/// Task 3 got the value. (len: 19419)
/// ```
///
#[allow(clippy::redundant_allocation)]
// https://rust-lang.github.io/rust-clippy/master/index.html#redundant_allocation
// `Arc<Box<dyn ..>>` in the return type creates an extra heap allocation.
Expand Down
12 changes: 6 additions & 6 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ where

match self.try_insert_waiter(&key, &waiter) {
None => {
// Inserted. Resolve the init future.
// Our waiter was inserted. Let's resolve the init future.
let value = init.await;
*lock = Some(Ok(value.clone()));
Initialized(value)
}
Some(res) => {
// Value already exists. Drop our write lock and wait for a read lock
// to become available.
// Somebody else's waiter already exists. Drop our write lock and wait
// for a read lock to become available.
std::mem::drop(lock);
match &*res.read().await {
Some(Ok(value)) => ReadExisting(value.clone()),
Expand All @@ -73,7 +73,7 @@ where

match self.try_insert_waiter(&key, &waiter) {
None => {
// Inserted. Resolve the init future.
// Our waiter was inserted. Let's resolve the init future.
match init.await {
Ok(value) => {
*lock = Some(Ok(value.clone()));
Expand All @@ -88,8 +88,8 @@ where
}
}
Some(res) => {
// Value already exists. Drop our write lock and wait for a read lock
// to become available.
// Somebody else's waiter already exists. Drop our write lock and wait
// for a read lock to become available.
std::mem::drop(lock);
match &*res.read().await {
Some(Ok(value)) => ReadExisting(value.clone()),
Expand Down
128 changes: 128 additions & 0 deletions src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,64 @@ where
/// key even if the method is concurrently called by many threads; only one of
/// the calls evaluates its function, and other calls wait for that function to
/// complete.
///
/// # Example
///
/// ```rust
/// use moka::sync::Cache;
/// use std::{sync::Arc, thread, time::Duration};
///
/// const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
/// let cache = Cache::new(100);
///
/// // Spawn four threads.
/// let threads: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// thread::spawn(move || {
/// println!("Thread {} started.", task_id);
///
/// // Try to insert and get the value for key1. Although all four
/// // threads will call `get_or_insert_with` at the same time, the
/// // `init` closure must be evaluated only once.
/// let value = my_cache.get_or_insert_with("key1", || {
/// println!("Thread {} inserting a value.", task_id);
/// Arc::new(vec![0u8; TEN_MIB])
/// });
///
/// // Ensure the value exists now.
/// assert_eq!(value.len(), TEN_MIB);
/// thread::sleep(Duration::from_millis(10));
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!("Thread {} got the value. (len: {})", task_id, value.len());
/// })
/// })
/// .collect();
///
/// // Wait all threads to complete.
/// threads
/// .into_iter()
/// .for_each(|t| t.join().expect("Thread failed"));
/// ```
///
/// **Result**
///
/// - The `init` closure called exactly once by thread 1.
/// - Other threads were blocked until thread 1 inserted the value.
///
/// ```console
/// Thread 1 started.
/// Thread 0 started.
/// Thread 3 started.
/// Thread 2 started.
/// Thread 1 inserting a value.
/// Thread 2 got the value. (len: 10485760)
/// Thread 1 got the value. (len: 10485760)
/// Thread 0 got the value. (len: 10485760)
/// Thread 3 got the value. (len: 10485760)
/// ```
///
pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V {
let hash = self.base.hash(&key);
let key = Arc::new(key);
Expand Down Expand Up @@ -293,6 +351,76 @@ where
/// key even if the method is concurrently called by many threads; only one of
/// the calls evaluates its function, and other calls wait for that function to
/// complete.
///
/// # Example
///
/// ```rust
/// use moka::sync::Cache;
/// use std::{path::Path, time::Duration, thread};
///
/// /// This function tries to get the file size in bytes.
/// fn get_file_size(
/// thread_id: u8,
/// path: impl AsRef<Path>,
/// ) -> Result<u64, Box<dyn std::error::Error + Send + Sync + 'static>> {
/// println!("get_file_size() called by thread {}.", thread_id);
/// Ok(std::fs::metadata(path)?.len())
/// }
///
/// let cache = Cache::new(100);
///
/// // Spawn four threads.
/// let threads: Vec<_> = (0..4_u8)
/// .map(|thread_id| {
/// let my_cache = cache.clone();
/// thread::spawn(move || {
/// println!("Thread {} started.", thread_id);
///
/// // Try to insert and get the value for key1. Although all four
/// // threads will call `get_or_try_insert_with` at the same time,
/// // get_file_size() must be called only once.
/// let value = my_cache.get_or_try_insert_with(
/// "key1",
/// || get_file_size(thread_id, "./Cargo.toml"),
/// );
///
/// // Ensure the value exists now.
/// assert!(value.is_ok());
/// thread::sleep(Duration::from_millis(10));
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!(
/// "Thread {} got the value. (len: {})",
/// thread_id,
/// value.unwrap()
/// );
/// })
/// })
/// .collect();
///
/// // Wait all threads to complete.
/// threads
/// .into_iter()
/// .for_each(|t| t.join().expect("Thread failed"));
/// ```
///
/// **Result**
///
/// - `get_file_size()` called exactly once by thread 1.
/// - Other threads were blocked until thread 1 inserted the value.
///
/// ```console
/// Thread 1 started.
/// Thread 2 started.
/// get_file_size() called by thread 1.
/// Thread 3 started.
/// Thread 0 started.
/// Thread 2 got the value. (len: 1466)
/// Thread 0 got the value. (len: 1466)
/// Thread 1 got the value. (len: 1466)
/// Thread 3 got the value. (len: 1466)
/// ```
///
#[allow(clippy::redundant_allocation)]
// https://rust-lang.github.io/rust-clippy/master/index.html#redundant_allocation
// `Arc<Box<dyn ..>>` in the return type creates an extra heap allocation.
Expand Down
Loading

0 comments on commit 6be9ce8

Please sign in to comment.