Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example called reinsert_expired_entries_sync #382

Merged
merged 8 commits into from
Jan 20, 2024
21 changes: 21 additions & 0 deletions .ci_extras/remove-examples-msrv.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash

# Disable examples from the MSRV build.

set -eux

function disable_example_sync() {
local example_name="$1"

mv ./examples/${example_name}.rs ./examples/${example_name}.rs.bak

# Replace the main function of example $1.
cat << EOF > ./examples/${example_name}.rs
fn main() {}
EOF

echo "Disabled $example_name."
}

# `OnceLock` was introduced in 1.70.0.
disable_example_sync reinsert_expired_entries_sync
4 changes: 4 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ jobs:
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/pin-crate-vers-msrv.sh

- name: Remove some examples (MSRV only)
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/remove-examples-msrv.sh

- name: Show cargo tree
uses: actions-rs/cargo@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/CIQuantaDisabled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ jobs:
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/pin-crate-vers-msrv.sh

- name: Remove some examples (MSRV only)
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/remove-examples-msrv.sh

- name: Run tests (debug, but no quanta feature)
uses: actions-rs/cargo@v1
with:
Expand Down
12 changes: 10 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
# Moka Cache &mdash; Change Log

## Version 0.12.4

### Added

- Added an example for reinserting expired entries. ([#382][gh-pull-0382])


## Version 0.12.3

### Added

- Added the upsert and compute methods for modifying a cached entry
([#370][gh-pull-0370]):
- Now the `entry` or `entry_by_ref` APIs have the following methods:
- Now the `entry` and `entry_by_ref` APIs have the following methods:
- `and_upsert_with` method to insert or update the entry.
- `and_compute_with` method to insert, update, remove or do nothing on the
entry.
Expand All @@ -18,7 +25,7 @@
- Raised the version requirement of the `quanta` from `>=0.11.0, <0.12.0` to
`>=0.12.2, <0.13.0` to avoid under-measuring the elapsed time on Apple silicon
Macs ([#376][gh-pull-0376]).
- Due to this under-measurement, cached entries expire sightly later than
- Due to this under-measurement, cached entries can expire sightly later than
expected on macOS arm64.


Expand Down Expand Up @@ -803,6 +810,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0382]: https://github.com/moka-rs/moka/pull/382/
[gh-pull-0376]: https://github.com/moka-rs/moka/pull/376/
[gh-pull-0370]: https://github.com/moka-rs/moka/pull/370/
[gh-pull-0363]: https://github.com/moka-rs/moka/pull/363/
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.3"
version = "0.12.4"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down Expand Up @@ -146,6 +146,10 @@ required-features = ["sync"]
name = "eviction_listener_sync"
required-features = ["sync"]

[[example]]
name = "reinsert_expired_entries_sync"
required-features = ["sync"]

[[example]]
name = "size_aware_eviction_sync"
required-features = ["sync"]
Expand Down
8 changes: 8 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ existence of the entry.
- Beside the cache APIs, uses `BTreeMap`, `Arc` and mpsc channel (multi-producer,
single consumer channel).

- [reinsert_expired_entries_sync](./reinsert_expired_enties_sync.rs)
- Reinserts the expired entries into the cache using eviction listener and
worker threads.
- Spawns two worker threads; one for reinserting entries, and the other for
calling `run_pending_tasks`.
- Uses a mpsc channel (multi-producer, single consumer channel) to send commands
from the eviction listener to the first worker thread.

## Check out the API Documentation too!

The examples are not meant to be exhaustive. Please check the
Expand Down
152 changes: 152 additions & 0 deletions examples/reinsert_expired_entries_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! This example demonstrates how to write an eviction listener that will reinsert
//! the expired entries.
//!
//! We cannot make the eviction listener directly reinsert the entries, because it
//! will lead to a deadlock in some conditions. Instead, we will create a worker
//! thread to do the reinsertion, and create a mpsc channel to send commands from the
//! eviction listener to the worker thread.

use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, Sender},
Arc, Mutex, OnceLock,
},
thread,
time::{Duration, Instant},
};

use moka::{notification::RemovalCause, sync::Cache};

/// The cache key type.
pub type Key = String;
/// The cache value type.
pub type Value = u32;

/// Command for the worker thread.
pub enum Command {
/// (Re)insert the entry with the given key and value.
Insert(Key, Value),
/// Shutdown the worker thread.
Shutdown,
}

fn main() {
// Create a multi-producer single-consumer (mpsc) channel to send commands
// from the eviction listener to the worker thread.
let (snd, rcv) = mpsc::channel();

// Wrap the Sender (snd) with a Mutex and set to a static OnceLock.
//
// Cache requires an eviction listener to be Sync as it will be executed by
// multiple threads. However the Sender (snd) of the channel is not Sync, so the
// eviction listener cannot capture the Sender directly.
//
// We are going to solve this by making the Sender globally accessible via the
// static OnceLock, and make the eviction listener to clone it per thread.
static SND: OnceLock<Mutex<Sender<Command>>> = OnceLock::new();
SND.set(Mutex::new(snd.clone())).unwrap();

// Create the eviction listener.
let listener = move |key: Arc<String>, value: u32, cause: RemovalCause| {
// Keep a clone of the Sender in our thread-local variable, so that we can
// send a command without locking the Mutex every time.
thread_local! {
static THREAD_SND: Sender<Command> = SND.get().unwrap().lock().unwrap().clone();
}

println!("{} was evicted. value: {} ({:?})", key, value, cause);

// If the entry was removed due to expiration, send a command to the channel
// to reinsert the entry with a modified value.
if cause == RemovalCause::Expired {
let new_value = value * 2;
let command = Command::Insert(key.to_string(), new_value);
THREAD_SND.with(|snd| snd.send(command).expect("Cannot send"));
}

// Do nothing if the entry was removed by one of the following reasons:
// - Reached to the capacity limit. (RemovalCause::Size)
// - Manually invalidated. (RemovalCause::Explicit)
};

const MAX_CAPACITY: u64 = 7;
const TTL: Duration = Duration::from_secs(3);

// Create a cache with the max capacity, time-to-live and the eviction listener.
let cache = Arc::new(
Cache::builder()
.max_capacity(MAX_CAPACITY)
.time_to_live(TTL)
.eviction_listener(listener)
.build(),
);

// Spawn the worker thread that receives commands from the channel and reinserts
// the entries.
let worker1 = {
let cache = Arc::clone(&cache);

thread::spawn(move || {
// Repeat until receiving a shutdown command.
loop {
match rcv.recv() {
Ok(Command::Insert(key, value)) => {
println!("Reinserting {} with value {}.", key, value);
cache.insert(key, value);
}
Ok(Command::Shutdown) => break,
Err(e) => {
eprintln!("Cannot receive a command: {:?}", e);
break;
}
}
}

println!("Shutdown the worker thread.");
})
};

// Spawn another worker thread that calls `cache.run_pending_tasks()` every 300
// milliseconds.
let shutdown = Arc::new(AtomicBool::new(false));
let worker2 = {
let cache = Arc::clone(&cache);
let shutdown = Arc::clone(&shutdown);

thread::spawn(move || {
let interval = Duration::from_millis(300);
let mut sleep_duration = interval;

// Repeat until the shutdown latch is set.
while !shutdown.load(Ordering::Relaxed) {
thread::sleep(sleep_duration);
let start = Instant::now();
cache.run_pending_tasks();
sleep_duration = interval.saturating_sub(start.elapsed());
}
})
};

// Insert 9 entries.
// - The last 2 entries will be evicted due to the capacity limit.
// - The remaining 7 entries will be evicted after 3 seconds, and then the worker
// thread will reinsert them with modified values.
for i in 1..=9 {
thread::sleep(Duration::from_millis(100));
let key = i.to_string();
let value = i;
println!("Inserting {} with value {}.", key, value);
cache.insert(key, value);
}

// Wait for 8 seconds.
thread::sleep(Duration::from_secs(8));

// Shutdown the worker threads.
snd.send(Command::Shutdown).expect("Cannot send");
worker1.join().expect("The worker thread 1 panicked");

shutdown.store(true, Ordering::Relaxed);
worker2.join().expect("The worker thread 2 panicked");
}
Loading