Skip to content

Commit

Permalink
Merge pull request #7 from moka-rs/async-cache
Browse files Browse the repository at this point in the history
Add async cache (futures-aware cache)
  • Loading branch information
tatsuya6502 authored Feb 12, 2021
2 parents 5d75e09 + 5c0d24e commit 5020a92
Show file tree
Hide file tree
Showing 19 changed files with 2,140 additions and 888 deletions.
12 changes: 9 additions & 3 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@ jobs:
override: true
components: rustfmt, clippy

- name: Build
- name: Build (no features)
uses: actions-rs/cargo@v1
with:
command: build

- name: Run tests
- name: Run tests (no features)
uses: actions-rs/cargo@v1
with:
command: test
args: --release

- name: Run tests (future)
uses: actions-rs/cargo@v1
with:
command: test
args: --release --features future

- name: Run Rustfmt
uses: actions-rs/cargo@v1
with:
Expand All @@ -56,4 +62,4 @@ jobs:
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: -- -D warnings
args: --features future -- -D warnings
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"rust-analyzer.cargo.features": ["future"],
"cSpell.words": [
"CLFU",
"Deque",
Expand All @@ -9,6 +10,7 @@
"Moka",
"Ristretto",
"Tatsuya",
"actix",
"ahash",
"benmanes",
"clippy",
Expand All @@ -18,6 +20,7 @@
"mpsc",
"nanos",
"nocapture",
"runtimes",
"rustdoc",
"rustfmt",
"semver",
Expand Down
7 changes: 7 additions & 0 deletions RELEASES.md → CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Moka — Release Notes

## Version 0.2.0

### Features

- Introduce an asynchronous (futures aware) cache.


## Version 0.1.0

### Features
Expand Down
21 changes: 17 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.1.0"
version = "0.2.0"
authors = ["Tatsuya Kawano <tatsuya@hibaridb.org>"]
edition = "2018"

Expand All @@ -13,13 +13,19 @@ keywords = ["concurrent", "cache"]
categories = ["concurrency", "caching"]
readme = "README.md"
exclude = [".devcontainer", ".github", ".vscode"]

build = "build.rs"

# https://docs.rs/about/metadata
[package.metadata.docs.rs]
features = ["future"]

[features]
default = []
future = ["async-io"]

[dependencies]
cht = "0.4"
crossbeam-channel = "0.5"
# hierarchical_hash_wheel_timer = "1.0"
num_cpus = "1.13"
once_cell = "1.5"
parking_lot = "0.11"
Expand All @@ -28,10 +34,17 @@ parking_lot = "0.11"
quanta = "0.7.1"
scheduled-thread-pool = "0.2"

# Optional dependencies
async-io = { version = "1", optional = true }

[dev-dependencies]
actix-rt2 = { package = "actix-rt", version = "2", default-features = false }
actix-rt1 = { package = "actix-rt", version = "1", default-features = false }
async-std = { version = "1", features = ["attributes"] }
futures = "0.3"
getrandom = "0.2"
skeptic = "0.13"
tokio = { version = "1.2", features = ["rt-multi-thread", "macros" ] }
tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }

[build-dependencies]
skeptic = "0.13"
175 changes: 87 additions & 88 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
Moka is a fast, concurrent cache library for Rust. Moka is inspired by
[Caffeine][caffeine-git] (Java) and [Ristretto][ristretto-git] (Go).

Moka provides a cache that supports full concurrency of retrievals and a high
expected concurrency for updates. It also provides a segmented cache for increased
concurrent update performance. These caches perform a best-effort bounding of a map
using an entry replacement algorithm to determine which entries to evict when the
capacity is exceeded.
Moka provides cache implementations that support full concurrency of retrievals and
a high expected concurrency for updates. They perform a best-effort bounding of a
concurrent hash map using an entry replacement algorithm to determine which entries
to evict when the capacity is exceeded.

[gh-actions-badge]: https://github.com/moka-rs/moka/workflows/CI/badge.svg
[release-badge]: https://img.shields.io/crates/v/moka.svg
Expand All @@ -32,40 +31,48 @@ capacity is exceeded.

## Features

- Thread-safe, highly concurrent in-memory cache implementations.
- Thread-safe, highly concurrent in-memory cache implementations:
- Synchronous (blocking) caches that can be shared across OS threads.
- An asynchronous (futures aware) cache that can be accessed inside and outside
of asynchronous contexts.
- Caches are bounded by the maximum number of entries.
- Maintains good hit rate by using entry replacement algorithms inspired by
- Maintains good hit rate by using an entry replacement algorithms inspired by
[Caffeine][caffeine-git]:
- Admission to a cache is controlled by the Least Frequently Used (LFU) policy.
- Eviction from a cache is controlled by the Least Recently Used (LRU) policy.
- Supports expiration policies:
- Time to live
- Time to idle

Moka currently does not provide `async` optimized caches. The synchronous (blocking)
caches in the current version can be safely used in async runtime such as Tokio or
async-std, but will not produce optimal performance under heavy updates. See
[this example][async-example] for more details. A near future version of Moka will
provide `async` optimized caches in addition to the sync caches.

[async-example]: #using-cache-with-an-async-runtime-tokio-async-std-etc


## Usage

Add this to your `Cargo.toml`:

```toml
[dependencies]
moka = "0.1"
moka = "0.2"
```

To use the asynchronous cache, enable a crate feature called "future".

```toml
[dependencies]
moka = { version = "0.2", features = ["future"] }
```


## Example: Synchronous Cache

The synchronous (blocking) caches are defined in the `sync` module.

Cache entries are manually added using `insert` method, and are stored in the cache
until either evicted or manually invalidated.

Here's an example that reads and updates a cache by using multiple threads:

```rust
// Use the synchronous cache.
use moka::sync::Cache;

use std::thread;
Expand Down Expand Up @@ -121,56 +128,38 @@ fn main() {
```


### Avoiding to clone the value at `get`

The return type of `get` method is `Option<V>` instead of `Option<&V>`, where `V` is
the value type. Every time `get` is called for an existing key, it creates a clone of
the stored value `V` and returns it. This is because the `Cache` allows concurrent
updates from threads so a value stored in the cache can be dropped or replaced at any
time by any other thread. `get` cannot return a reference `&V` as it is impossible to
guarantee the value outlives the reference.

If you want to store values that will be expensive to clone, wrap them by
`std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a thread-safe
reference-counted pointer and its `clone()` method is cheap.

[rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html

```rust,ignore
use std::sync::Arc;
let key = ...
let large_value = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
// When insert, wrap the large_value by Arc.
cache.insert(key.clone(), Arc::new(large_value));
## Example: Asynchronous Cache

// get() will call Arc::clone() on the stored value, which is cheap.
cache.get(&key);
```
The asynchronous (futures aware) cache is defined in the `future` module.
It works with asynchronous runtime such as [Tokio][tokio-crate],
[async-std][async-std-crate] or [actix-rt][actix-rt-crate].
To use the asynchronous cache, [enable a crate feature called "future"](#usage).

[tokio-crate]: https://crates.io/crates/tokio
[async-std-crate]: https://crates.io/crates/asinc-std
[actix-rt-crate]: https://crates.io/crates/actix-rt

## Using Cache with an Async Runtime (Tokio, async-std, etc.)
Cache entries are manually added using an insert method, and are stored in the cache
until either evicted or manually invalidated:

Currently, Moka does not provide `async` optimized caches. An update operation
(`insert` or `invalidate` method) can be blocked for a short time under heavy
updates. They employ locks, mpsc channels and thread sleeps that are not aware of the
[Future trait][std-future] in std. While `insert` or `invalidate` can be safely
called in an `async fn` or `async` block, they will not produce optimal performance
as they may prevent async tasks from switching while acquiring a lock.
- Inside an async context (`async fn` or `async` block), use `insert` or `invalidate`
method for updating the cache and `await` them.
- Outside any async context, use `blocking_insert` or `blocking_invalidate`
methods. They will block for a short time under heavy updates.

Here is a similar program to the previous example, but using [Tokio][tokio-crate]
runtime:
Here is a similar program to the previous example, but using asynchronous cache with
[Tokio][tokio-crate] runtime:

```rust
```rust,ignore
// Cargo.toml
//
// [dependencies]
// tokio = { version = "1.1", features = ["rt-multi-thread", "macros" ] }
// moka = { version = "0.2", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures = "0.3"
use moka::sync::Cache;

use tokio::task;
// Use the asynchronous cache.
use moka::future::Cache;
#[tokio::main]
async fn main() {
Expand All @@ -196,27 +185,23 @@ async fn main() {
tokio::spawn(async move {
// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
// insert() may block for a short time under heavy updates,
// but can be safely called in an async block.
my_cache.insert(key, value(key));
// insert() is an async method, so await it.
my_cache.insert(key, value(key)).await;
// get() returns Option<String>, a clone of the stored value.
assert_eq!(my_cache.get(&key), Some(value(key)));
}
// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
// invalidate() may block for a short time under heavy updates,
// but can be safely called in an async block.
my_cache.invalidate(&key);
// invalidate() is an async method, so await it.
my_cache.invalidate(&key).await;
}
})
})
.collect();
// Wait for all tasks to complete.
for task in tasks {
task.await.expect("Failed");
}
futures::future::join_all(tasks).await;
// Verify the result.
for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
Expand All @@ -229,13 +214,37 @@ async fn main() {
}
```

A near future version of Moka will provide `async` optimized caches in addition to
the synchronous caches.

[std-future]: https://doc.rust-lang.org/stable/std/future/trait.Future.html
[tokio-crate]: https://crates.io/crates/tokio
### Avoiding to clone the value at `get`

The return type of `get` method is `Option<V>` instead of `Option<&V>`, where `V` is
the value type. Every time `get` is called for an existing key, it creates a clone of
the stored value `V` and returns it. This is because the `Cache` allows concurrent
updates from threads so a value stored in the cache can be dropped or replaced at any
time by any other thread. `get` cannot return a reference `&V` as it is impossible to
guarantee the value outlives the reference.

If you want to store values that will be expensive to clone, wrap them by
`std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a thread-safe
reference-counted pointer and its `clone()` method is cheap.

## Usage: Expiration Policies
[rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html

```rust,ignore
use std::sync::Arc;
let key = ...
let large_value = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
// When insert, wrap the large_value by Arc.
cache.insert(key.clone(), Arc::new(large_value));
// get() will call Arc::clone() on the stored value, which is cheap.
cache.get(&key);
```


## Example: Expiration Policies

Moka supports the following expiration policies:

Expand Down Expand Up @@ -271,21 +280,6 @@ fn main() {
}
```

## Segmented Cache

Moka caches maintain internal data structures for entry replacement algorithms. These
structures are guarded by a lock and operations are applied in batches using a
dedicated worker thread to avoid lock contention. `sync::Cache` has only one worker
thread, so under heavy updates, the worker thread may not be able to catch up to the
updates. When this happens, `insert` or `invalidate` call will be paused (blocked)
for a short time.

If this pause happens very often, you may want to switch to `sync::SegmentedCache`.
A segmented cache has multiple internal cache segments and each segment has its own
worker thread. This will reduce the chances of the pausing.

Use `segments` method of the `CacheBuilder` to create a segmented cache.


## Hashing Algorithm

Expand Down Expand Up @@ -326,7 +320,8 @@ considered a semver-breaking change.

## Road Map

- [ ] `async` optimized caches.
- [x] `async` optimized caches. (`v0.2.0`)
- [ ] Cache statistics. (Hit rate, etc.)
- [ ] Upgrade TinyLFU to Window TinyLFU.
- [ ] The variable (per-entry) expiration, using a hierarchical timer wheel.

Expand All @@ -341,8 +336,12 @@ brews espresso-like coffee using boiling water pressurized by steam.

## License

Moka is distributed under the terms of both the MIT license and the Apache License
(Version 2.0).
Moka is distributed under either of

- The MIT license
- The Apache License (Version 2.0)

at your option.

See [LICENSE-MIT](LICENSE-MIT) and [LICENSE-APACHE](LICENSE-APACHE) for details.

Expand Down
Loading

0 comments on commit 5020a92

Please sign in to comment.