Skip to content

Commit

Permalink
Merge pull request #350 from peter-scholtens/main
Browse files Browse the repository at this point in the history
Example for cascading drop triggered by eviction
  • Loading branch information
tatsuya6502 authored Dec 28, 2023
2 parents 4e26b76 + 1c302a0 commit e63c842
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 8 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ rustdoc-args = ["--cfg", "docsrs"]
name = "async_example"
required-features = ["future"]

[[example]]
name = "cascading_drop_async"
required-features = ["future"]

[[example]]
name = "sync_example"
required-features = ["sync"]
Expand Down
189 changes: 189 additions & 0 deletions examples/cascading_drop_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use moka::future::Cache;
use std::collections::btree_map;
use std::collections::BTreeMap;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::time::Duration;

#[derive(Debug)]
pub struct User {
user_id: u64, // Needed as key in BTreeMap when executing a recursive Drop of a Session
name: String,
friends: Vec<Arc<Mutex<User>>>,
}

impl User {
pub fn print_friends(&self) {
print!("User {} has friends ", self.name);
for f in &self.friends {
print!("{}, ", f.lock().unwrap().name);
}
println!();
}
}

impl Drop for User {
fn drop(&mut self) {
println!("Dropping user {}", self.name);
}
}

pub struct Session {
ptr: Option<Arc<Mutex<User>>>,
sender: std::sync::mpsc::Sender<u64>,
}

impl Drop for Session {
fn drop(&mut self) {
let user_id;

Check failure on line 41 in examples/cascading_drop_async.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded late initialization

error: unneeded late initialization --> examples/cascading_drop_async.rs:41:9 | 41 | let user_id; | ^^^^^^^^^^^^ created here 42 | user_id = self.ptr.as_ref().unwrap().lock().unwrap().user_id; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ initialised here | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_late_init = note: `-D clippy::needless-late-init` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::needless_late_init)]` help: declare `user_id` here | 42 | let user_id = self.ptr.as_ref().unwrap().lock().unwrap().user_id; | ~~~~~~~~~~~
user_id = self.ptr.as_ref().unwrap().lock().unwrap().user_id;
println!("Dropping session holding a reference to user {}", user_id);
self.ptr = None; // Must drop Arc before verify Btree!!!
let _ = self.sender.send(user_id);
}
}

#[tokio::main]
async fn main() {
// For a webserver you may want to access the users via their cached session
// or via their user number, as they can be friends of each other.
// Using the Drop trait for a Session, orphaned users will get pruned.
//
// Create some users.
let user1 = Arc::new(Mutex::new(User {
user_id: 1,
name: String::from("Alice"),
friends: vec![],
}));
let user2 = Arc::new(Mutex::new(User {
user_id: 2,
name: String::from("Bob"),
friends: vec![],
}));
// There will be no session of user Charlie, but he will connected as friend.
let user3 = Arc::new(Mutex::new(User {
user_id: 3,
name: String::from("Charlie"),
friends: vec![],
}));
// Connect their friends to them.
user2.lock().unwrap().friends.push(user1.clone());
user2.lock().unwrap().friends.push(user3.clone());
user2.lock().unwrap().print_friends();

// Store users names in a B-tree by number.
let mut group_tree = BTreeMap::new();
group_tree.insert(1, user1.clone());
group_tree.insert(2, user2.clone());
// The group_tree MUST consume user3 here, and not a clone, otherwise
// strong_count() reports that user3 still has another (unused) reference!
group_tree.insert(3, user3);

// Create mpsc channel for pruning user-ids in B-tree.
let (send, recv) = mpsc::channel::<u64>();
let send_cl = send.clone();
let group_tree = Arc::new(Mutex::new(group_tree));
let group_tree_cl = group_tree.clone();
thread::spawn(move || loop {
for u in recv.iter() {
println!(
"user id {} has strong count: {}",
u,
Arc::strong_count(group_tree_cl.lock().unwrap().get(&u).unwrap())
);
let mut verify_queue = Vec::new();
match group_tree_cl.lock().unwrap().entry(u) {
btree_map::Entry::Occupied(e) if Arc::strong_count(e.get()) < 2 => {
let u = e.remove();
for f in u.lock().unwrap().friends.iter() {
let u = f.lock().unwrap().user_id;
verify_queue.push(u);
}
}
_ => {}
};
// drop here:
if !verify_queue.is_empty() {
println!("Send users to verification queue: {:?}", verify_queue);
for i in verify_queue {
let _ = send_cl.send(i);
}
}
}
});

// Make an artificially small cache and 1-second ttl to observe pruning of the tree.
let ttl = 3;
let sessions_cache = Cache::builder()
.max_capacity(10)
.time_to_live(Duration::from_secs(ttl))
.eviction_listener(|key, value: Arc<Mutex<Session>>, cause| {
println!(
"Evicted session with key {:08X} of user_id {:?} because {:?}",
*key,
value
.lock()
.unwrap()
.ptr
.as_ref()
.unwrap()
.lock()
.unwrap()
.user_id,
cause
)
})
.build();
// To create some simple CRC-32 session keys with Bash do:
// for ((i = 1; i < 4 ; i++)); do rhash <(echo "$i")|tail -1; done

// Alice's session on browser
let session1 = Session {
ptr: Some(user1.clone()),
sender: send.clone(),
};
sessions_cache
.insert(0x6751FC53, Arc::new(Mutex::new(session1)))
.await;

// Alice's second session on smartphone
let session2 = Session {
ptr: Some(user1),
sender: send.clone(),
};
sessions_cache
.insert(0x4C7CAF90, Arc::new(Mutex::new(session2)))
.await;

// Add also Bob's session
let session3 = Session {
ptr: Some(user2),
sender: send.clone(),
};
sessions_cache
.insert(0x55679ED1, Arc::new(Mutex::new(session3)))
.await;

// Show cache content
for (key, value) in sessions_cache.iter() {
let session = value.lock().unwrap();
println!(
"Found session {:08X} from user_id: {}",
*key,
session.ptr.as_ref().unwrap().lock().unwrap().user_id
);
}

println!("Waiting");
for t in 1..=ttl + 1 {
sleep(Duration::from_secs(1));
sessions_cache.get(&0).await;
sessions_cache.run_pending_tasks().await;
println!("t = {}, pending: {}", t, sessions_cache.entry_count());
}
assert!(group_tree.lock().unwrap().is_empty());
println!("Exit program.");
}
23 changes: 15 additions & 8 deletions examples/eviction_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::time::Duration;

fn main() {
// Make an artificially small cache and 1-second ttl to observe eviction listener.
let ttl = 1;
{
let cache = Cache::builder()
.max_capacity(2)
.time_to_live(Duration::from_secs(1))
.time_to_live(Duration::from_secs(ttl))
.eviction_listener(|key, value, cause| {
println!("Evicted ({key:?},{value:?}) because {cause:?}")
})
Expand All @@ -20,24 +21,30 @@ fn main() {
// Replaced and Size.
cache.insert(&2, "two".to_string());
// With 1-second ttl, keys 0 and 1 will be evicted if we wait long enough.
sleep(Duration::from_secs(2));
sleep(Duration::from_secs(ttl + 1));
println!("Wake up!");
cache.insert(&3, "three".to_string());
cache.insert(&4, "four".to_string());
let _ = cache.remove(&3);

// Remove from cache and return value:
if let Some(v) = cache.remove(&3) {
println!("Removed: {v}")
};
// Or remove from cache without returning the value.
cache.invalidate(&4);

cache.insert(&5, "five".to_string());

// invalidate_all() removes entries using a background thread, so there will
// be some delay before entries are removed and the eviction listener is
// called. If you want to remove all entries immediately, call sync() method
// repeatedly like the loop below.
// called. If you want to remove all entries immediately, call
// run_pending_tasks() method repeatedly like the loop below.
cache.invalidate_all();
loop {
// Synchronization is limited to at most 500 entries for each call.
cache.run_pending_tasks();
// Check if all is done. Calling entry_count() requires calling sync()
// first!
// Check if all is done. Calling entry_count() requires calling
// run_pending_tasks() first!
if cache.entry_count() == 0 {
break;
}
Expand All @@ -46,7 +53,7 @@ fn main() {
cache.insert(&6, "six".to_string());
// When cache is dropped eviction listener is not called. Either call
// invalidate_all() or wait longer than ttl.
sleep(Duration::from_secs(2));
sleep(Duration::from_secs(ttl + 1));
} // cache is dropped here.

println!("Cache structure removed.");
Expand Down

0 comments on commit e63c842

Please sign in to comment.