Skip to content

Commit

Permalink
Merge pull request #382 from moka-rs/reinsert-example
Browse files Browse the repository at this point in the history
Add an example called `reinsert_expired_entries_sync`
  • Loading branch information
tatsuya6502 authored Jan 20, 2024
2 parents a8c6845 + 46c5f11 commit 521193a
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 3 deletions.
21 changes: 21 additions & 0 deletions .ci_extras/remove-examples-msrv.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash

# Disable examples from the MSRV build.

set -eux

function disable_example_sync() {
local example_name="$1"

mv ./examples/${example_name}.rs ./examples/${example_name}.rs.bak

# Replace the main function of example $1.
cat << EOF > ./examples/${example_name}.rs
fn main() {}
EOF

echo "Disabled $example_name."
}

# `OnceLock` was introduced in 1.70.0.
disable_example_sync reinsert_expired_entries_sync
4 changes: 4 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ jobs:
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/pin-crate-vers-msrv.sh

- name: Remove some examples (MSRV only)
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/remove-examples-msrv.sh

- name: Show cargo tree
uses: actions-rs/cargo@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/CIQuantaDisabled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ jobs:
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/pin-crate-vers-msrv.sh

- name: Remove some examples (MSRV only)
if: ${{ matrix.rust == '1.65.0' }}
run: ./.ci_extras/remove-examples-msrv.sh

- name: Run tests (debug, but no quanta feature)
uses: actions-rs/cargo@v1
with:
Expand Down
12 changes: 10 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
# Moka Cache &mdash; Change Log

## Version 0.12.4

### Added

- Added an example for reinserting expired entries. ([#382][gh-pull-0382])


## Version 0.12.3

### Added

- Added the upsert and compute methods for modifying a cached entry
([#370][gh-pull-0370]):
- Now the `entry` or `entry_by_ref` APIs have the following methods:
- Now the `entry` and `entry_by_ref` APIs have the following methods:
- `and_upsert_with` method to insert or update the entry.
- `and_compute_with` method to insert, update, remove or do nothing on the
entry.
Expand All @@ -18,7 +25,7 @@
- Raised the version requirement of the `quanta` from `>=0.11.0, <0.12.0` to
`>=0.12.2, <0.13.0` to avoid under-measuring the elapsed time on Apple silicon
Macs ([#376][gh-pull-0376]).
- Due to this under-measurement, cached entries expire sightly later than
- Due to this under-measurement, cached entries can expire sightly later than
expected on macOS arm64.


Expand Down Expand Up @@ -803,6 +810,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0382]: https://github.com/moka-rs/moka/pull/382/
[gh-pull-0376]: https://github.com/moka-rs/moka/pull/376/
[gh-pull-0370]: https://github.com/moka-rs/moka/pull/370/
[gh-pull-0363]: https://github.com/moka-rs/moka/pull/363/
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.3"
version = "0.12.4"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down Expand Up @@ -146,6 +146,10 @@ required-features = ["sync"]
name = "eviction_listener_sync"
required-features = ["sync"]

[[example]]
name = "reinsert_expired_entries_sync"
required-features = ["sync"]

[[example]]
name = "size_aware_eviction_sync"
required-features = ["sync"]
Expand Down
8 changes: 8 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ existence of the entry.
- Beside the cache APIs, uses `BTreeMap`, `Arc` and mpsc channel (multi-producer,
single consumer channel).

- [reinsert_expired_entries_sync](./reinsert_expired_enties_sync.rs)
- Reinserts the expired entries into the cache using eviction listener and
worker threads.
- Spawns two worker threads; one for reinserting entries, and the other for
calling `run_pending_tasks`.
- Uses a mpsc channel (multi-producer, single consumer channel) to send commands
from the eviction listener to the first worker thread.

## Check out the API Documentation too!

The examples are not meant to be exhaustive. Please check the
Expand Down
152 changes: 152 additions & 0 deletions examples/reinsert_expired_entries_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//! This example demonstrates how to write an eviction listener that will reinsert
//! the expired entries.
//!
//! We cannot make the eviction listener directly reinsert the entries, because it
//! will lead to a deadlock in some conditions. Instead, we will create a worker
//! thread to do the reinsertion, and create a mpsc channel to send commands from the
//! eviction listener to the worker thread.
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, Sender},
Arc, Mutex, OnceLock,
},
thread,
time::{Duration, Instant},
};

use moka::{notification::RemovalCause, sync::Cache};

/// The cache key type.
pub type Key = String;
/// The cache value type.
pub type Value = u32;

/// Command for the worker thread.
pub enum Command {
/// (Re)insert the entry with the given key and value.
Insert(Key, Value),
/// Shutdown the worker thread.
Shutdown,
}

fn main() {
// Create a multi-producer single-consumer (mpsc) channel to send commands
// from the eviction listener to the worker thread.
let (snd, rcv) = mpsc::channel();

// Wrap the Sender (snd) with a Mutex and set to a static OnceLock.
//
// Cache requires an eviction listener to be Sync as it will be executed by
// multiple threads. However the Sender (snd) of the channel is not Sync, so the
// eviction listener cannot capture the Sender directly.
//
// We are going to solve this by making the Sender globally accessible via the
// static OnceLock, and make the eviction listener to clone it per thread.
static SND: OnceLock<Mutex<Sender<Command>>> = OnceLock::new();
SND.set(Mutex::new(snd.clone())).unwrap();

// Create the eviction listener.
let listener = move |key: Arc<String>, value: u32, cause: RemovalCause| {
// Keep a clone of the Sender in our thread-local variable, so that we can
// send a command without locking the Mutex every time.
thread_local! {
static THREAD_SND: Sender<Command> = SND.get().unwrap().lock().unwrap().clone();
}

println!("{} was evicted. value: {} ({:?})", key, value, cause);

// If the entry was removed due to expiration, send a command to the channel
// to reinsert the entry with a modified value.
if cause == RemovalCause::Expired {
let new_value = value * 2;
let command = Command::Insert(key.to_string(), new_value);
THREAD_SND.with(|snd| snd.send(command).expect("Cannot send"));
}

// Do nothing if the entry was removed by one of the following reasons:
// - Reached to the capacity limit. (RemovalCause::Size)
// - Manually invalidated. (RemovalCause::Explicit)
};

const MAX_CAPACITY: u64 = 7;
const TTL: Duration = Duration::from_secs(3);

// Create a cache with the max capacity, time-to-live and the eviction listener.
let cache = Arc::new(
Cache::builder()
.max_capacity(MAX_CAPACITY)
.time_to_live(TTL)
.eviction_listener(listener)
.build(),
);

// Spawn the worker thread that receives commands from the channel and reinserts
// the entries.
let worker1 = {
let cache = Arc::clone(&cache);

thread::spawn(move || {
// Repeat until receiving a shutdown command.
loop {
match rcv.recv() {
Ok(Command::Insert(key, value)) => {
println!("Reinserting {} with value {}.", key, value);
cache.insert(key, value);
}
Ok(Command::Shutdown) => break,
Err(e) => {
eprintln!("Cannot receive a command: {:?}", e);
break;
}
}
}

println!("Shutdown the worker thread.");
})
};

// Spawn another worker thread that calls `cache.run_pending_tasks()` every 300
// milliseconds.
let shutdown = Arc::new(AtomicBool::new(false));
let worker2 = {
let cache = Arc::clone(&cache);
let shutdown = Arc::clone(&shutdown);

thread::spawn(move || {
let interval = Duration::from_millis(300);
let mut sleep_duration = interval;

// Repeat until the shutdown latch is set.
while !shutdown.load(Ordering::Relaxed) {
thread::sleep(sleep_duration);
let start = Instant::now();
cache.run_pending_tasks();
sleep_duration = interval.saturating_sub(start.elapsed());
}
})
};

// Insert 9 entries.
// - The last 2 entries will be evicted due to the capacity limit.
// - The remaining 7 entries will be evicted after 3 seconds, and then the worker
// thread will reinsert them with modified values.
for i in 1..=9 {
thread::sleep(Duration::from_millis(100));
let key = i.to_string();
let value = i;
println!("Inserting {} with value {}.", key, value);
cache.insert(key, value);
}

// Wait for 8 seconds.
thread::sleep(Duration::from_secs(8));

// Shutdown the worker threads.
snd.send(Command::Shutdown).expect("Cannot send");
worker1.join().expect("The worker thread 1 panicked");

shutdown.store(true, Ordering::Relaxed);
worker2.join().expect("The worker thread 2 panicked");
}

0 comments on commit 521193a

Please sign in to comment.