Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async cache (futures-aware cache) #7

Merged
merged 15 commits into from
Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,23 @@ jobs:
override: true
components: rustfmt, clippy

- name: Build
- name: Build (no features)
uses: actions-rs/cargo@v1
with:
command: build

- name: Run tests
- name: Run tests (no features)
uses: actions-rs/cargo@v1
with:
command: test
args: --release

- name: Run tests (future)
uses: actions-rs/cargo@v1
with:
command: test
args: --release --features future

- name: Run Rustfmt
uses: actions-rs/cargo@v1
with:
Expand All @@ -56,4 +62,4 @@ jobs:
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: -- -D warnings
args: --features future -- -D warnings
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"rust-analyzer.cargo.features": ["future"],
"cSpell.words": [
"CLFU",
"Deque",
Expand All @@ -9,6 +10,7 @@
"Moka",
"Ristretto",
"Tatsuya",
"actix",
"ahash",
"benmanes",
"clippy",
Expand All @@ -18,6 +20,7 @@
"mpsc",
"nanos",
"nocapture",
"runtimes",
"rustdoc",
"rustfmt",
"semver",
Expand Down
7 changes: 7 additions & 0 deletions RELEASES.md → CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Moka — Release Notes

## Version 0.2.0

### Features

- Introduce an asynchronous (futures aware) cache.


## Version 0.1.0

### Features
Expand Down
21 changes: 17 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.1.0"
version = "0.2.0"
authors = ["Tatsuya Kawano <tatsuya@hibaridb.org>"]
edition = "2018"

Expand All @@ -13,13 +13,19 @@ keywords = ["concurrent", "cache"]
categories = ["concurrency", "caching"]
readme = "README.md"
exclude = [".devcontainer", ".github", ".vscode"]

build = "build.rs"

# https://docs.rs/about/metadata
[package.metadata.docs.rs]
features = ["future"]

[features]
default = []
future = ["async-io"]

[dependencies]
cht = "0.4"
crossbeam-channel = "0.5"
# hierarchical_hash_wheel_timer = "1.0"
num_cpus = "1.13"
once_cell = "1.5"
parking_lot = "0.11"
Expand All @@ -28,10 +34,17 @@ parking_lot = "0.11"
quanta = "0.7.1"
scheduled-thread-pool = "0.2"

# Optional dependencies
async-io = { version = "1", optional = true }

[dev-dependencies]
actix-rt2 = { package = "actix-rt", version = "2", default-features = false }
actix-rt1 = { package = "actix-rt", version = "1", default-features = false }
async-std = { version = "1", features = ["attributes"] }
futures = "0.3"
getrandom = "0.2"
skeptic = "0.13"
tokio = { version = "1.2", features = ["rt-multi-thread", "macros" ] }
tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }

[build-dependencies]
skeptic = "0.13"
175 changes: 87 additions & 88 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
Moka is a fast, concurrent cache library for Rust. Moka is inspired by
[Caffeine][caffeine-git] (Java) and [Ristretto][ristretto-git] (Go).

Moka provides a cache that supports full concurrency of retrievals and a high
expected concurrency for updates. It also provides a segmented cache for increased
concurrent update performance. These caches perform a best-effort bounding of a map
using an entry replacement algorithm to determine which entries to evict when the
capacity is exceeded.
Moka provides cache implementations that support full concurrency of retrievals and
a high expected concurrency for updates. They perform a best-effort bounding of a
concurrent hash map using an entry replacement algorithm to determine which entries
to evict when the capacity is exceeded.

[gh-actions-badge]: https://github.com/moka-rs/moka/workflows/CI/badge.svg
[release-badge]: https://img.shields.io/crates/v/moka.svg
Expand All @@ -32,40 +31,48 @@ capacity is exceeded.

## Features

- Thread-safe, highly concurrent in-memory cache implementations.
- Thread-safe, highly concurrent in-memory cache implementations:
- Synchronous (blocking) caches that can be shared across OS threads.
- An asynchronous (futures aware) cache that can be accessed inside and outside
of asynchronous contexts.
- Caches are bounded by the maximum number of entries.
- Maintains good hit rate by using entry replacement algorithms inspired by
- Maintains good hit rate by using an entry replacement algorithms inspired by
[Caffeine][caffeine-git]:
- Admission to a cache is controlled by the Least Frequently Used (LFU) policy.
- Eviction from a cache is controlled by the Least Recently Used (LRU) policy.
- Supports expiration policies:
- Time to live
- Time to idle

Moka currently does not provide `async` optimized caches. The synchronous (blocking)
caches in the current version can be safely used in async runtime such as Tokio or
async-std, but will not produce optimal performance under heavy updates. See
[this example][async-example] for more details. A near future version of Moka will
provide `async` optimized caches in addition to the sync caches.

[async-example]: #using-cache-with-an-async-runtime-tokio-async-std-etc


## Usage

Add this to your `Cargo.toml`:

```toml
[dependencies]
moka = "0.1"
moka = "0.2"
```

To use the asynchronous cache, enable a crate feature called "future".

```toml
[dependencies]
moka = { version = "0.2", features = ["future"] }
```


## Example: Synchronous Cache

The synchronous (blocking) caches are defined in the `sync` module.

Cache entries are manually added using `insert` method, and are stored in the cache
until either evicted or manually invalidated.

Here's an example that reads and updates a cache by using multiple threads:

```rust
// Use the synchronous cache.
use moka::sync::Cache;

use std::thread;
Expand Down Expand Up @@ -121,56 +128,38 @@ fn main() {
```


### Avoiding to clone the value at `get`

The return type of `get` method is `Option<V>` instead of `Option<&V>`, where `V` is
the value type. Every time `get` is called for an existing key, it creates a clone of
the stored value `V` and returns it. This is because the `Cache` allows concurrent
updates from threads so a value stored in the cache can be dropped or replaced at any
time by any other thread. `get` cannot return a reference `&V` as it is impossible to
guarantee the value outlives the reference.

If you want to store values that will be expensive to clone, wrap them by
`std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a thread-safe
reference-counted pointer and its `clone()` method is cheap.

[rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html

```rust,ignore
use std::sync::Arc;

let key = ...
let large_value = vec![0u8; 2 * 1024 * 1024]; // 2 MiB

// When insert, wrap the large_value by Arc.
cache.insert(key.clone(), Arc::new(large_value));
## Example: Asynchronous Cache

// get() will call Arc::clone() on the stored value, which is cheap.
cache.get(&key);
```
The asynchronous (futures aware) cache is defined in the `future` module.
It works with asynchronous runtime such as [Tokio][tokio-crate],
[async-std][async-std-crate] or [actix-rt][actix-rt-crate].
To use the asynchronous cache, [enable a crate feature called "future"](#usage).

[tokio-crate]: https://crates.io/crates/tokio
[async-std-crate]: https://crates.io/crates/asinc-std
[actix-rt-crate]: https://crates.io/crates/actix-rt

## Using Cache with an Async Runtime (Tokio, async-std, etc.)
Cache entries are manually added using an insert method, and are stored in the cache
until either evicted or manually invalidated:

Currently, Moka does not provide `async` optimized caches. An update operation
(`insert` or `invalidate` method) can be blocked for a short time under heavy
updates. They employ locks, mpsc channels and thread sleeps that are not aware of the
[Future trait][std-future] in std. While `insert` or `invalidate` can be safely
called in an `async fn` or `async` block, they will not produce optimal performance
as they may prevent async tasks from switching while acquiring a lock.
- Inside an async context (`async fn` or `async` block), use `insert` or `invalidate`
method for updating the cache and `await` them.
- Outside any async context, use `blocking_insert` or `blocking_invalidate`
methods. They will block for a short time under heavy updates.

Here is a similar program to the previous example, but using [Tokio][tokio-crate]
runtime:
Here is a similar program to the previous example, but using asynchronous cache with
[Tokio][tokio-crate] runtime:

```rust
```rust,ignore
// Cargo.toml
//
// [dependencies]
// tokio = { version = "1.1", features = ["rt-multi-thread", "macros" ] }
// moka = { version = "0.2", features = ["future"] }
// tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
// futures = "0.3"

use moka::sync::Cache;

use tokio::task;
// Use the asynchronous cache.
use moka::future::Cache;

#[tokio::main]
async fn main() {
Expand All @@ -196,27 +185,23 @@ async fn main() {
tokio::spawn(async move {
// Insert 64 entries. (NUM_KEYS_PER_TASK = 64)
for key in start..end {
// insert() may block for a short time under heavy updates,
// but can be safely called in an async block.
my_cache.insert(key, value(key));
// insert() is an async method, so await it.
my_cache.insert(key, value(key)).await;
// get() returns Option<String>, a clone of the stored value.
assert_eq!(my_cache.get(&key), Some(value(key)));
}

// Invalidate every 4 element of the inserted entries.
for key in (start..end).step_by(4) {
// invalidate() may block for a short time under heavy updates,
// but can be safely called in an async block.
my_cache.invalidate(&key);
// invalidate() is an async method, so await it.
my_cache.invalidate(&key).await;
}
})
})
.collect();

// Wait for all tasks to complete.
for task in tasks {
task.await.expect("Failed");
}
futures::future::join_all(tasks).await;

// Verify the result.
for key in 0..(NUM_TASKS * NUM_KEYS_PER_TASK) {
Expand All @@ -229,13 +214,37 @@ async fn main() {
}
```

A near future version of Moka will provide `async` optimized caches in addition to
the synchronous caches.

[std-future]: https://doc.rust-lang.org/stable/std/future/trait.Future.html
[tokio-crate]: https://crates.io/crates/tokio
### Avoiding to clone the value at `get`

The return type of `get` method is `Option<V>` instead of `Option<&V>`, where `V` is
the value type. Every time `get` is called for an existing key, it creates a clone of
the stored value `V` and returns it. This is because the `Cache` allows concurrent
updates from threads so a value stored in the cache can be dropped or replaced at any
time by any other thread. `get` cannot return a reference `&V` as it is impossible to
guarantee the value outlives the reference.

If you want to store values that will be expensive to clone, wrap them by
`std::sync::Arc` before storing in a cache. [`Arc`][rustdoc-std-arc] is a thread-safe
reference-counted pointer and its `clone()` method is cheap.

## Usage: Expiration Policies
[rustdoc-std-arc]: https://doc.rust-lang.org/stable/std/sync/struct.Arc.html

```rust,ignore
use std::sync::Arc;

let key = ...
let large_value = vec![0u8; 2 * 1024 * 1024]; // 2 MiB

// When insert, wrap the large_value by Arc.
cache.insert(key.clone(), Arc::new(large_value));

// get() will call Arc::clone() on the stored value, which is cheap.
cache.get(&key);
```


## Example: Expiration Policies

Moka supports the following expiration policies:

Expand Down Expand Up @@ -271,21 +280,6 @@ fn main() {
}
```

## Segmented Cache

Moka caches maintain internal data structures for entry replacement algorithms. These
structures are guarded by a lock and operations are applied in batches using a
dedicated worker thread to avoid lock contention. `sync::Cache` has only one worker
thread, so under heavy updates, the worker thread may not be able to catch up to the
updates. When this happens, `insert` or `invalidate` call will be paused (blocked)
for a short time.

If this pause happens very often, you may want to switch to `sync::SegmentedCache`.
A segmented cache has multiple internal cache segments and each segment has its own
worker thread. This will reduce the chances of the pausing.

Use `segments` method of the `CacheBuilder` to create a segmented cache.


## Hashing Algorithm

Expand Down Expand Up @@ -326,7 +320,8 @@ considered a semver-breaking change.

## Road Map

- [ ] `async` optimized caches.
- [x] `async` optimized caches. (`v0.2.0`)
- [ ] Cache statistics. (Hit rate, etc.)
- [ ] Upgrade TinyLFU to Window TinyLFU.
- [ ] The variable (per-entry) expiration, using a hierarchical timer wheel.

Expand All @@ -341,8 +336,12 @@ brews espresso-like coffee using boiling water pressurized by steam.

## License

Moka is distributed under the terms of both the MIT license and the Apache License
(Version 2.0).
Moka is distributed under either of

- The MIT license
- The Apache License (Version 2.0)

at your option.

See [LICENSE-MIT](LICENSE-MIT) and [LICENSE-APACHE](LICENSE-APACHE) for details.

Expand Down
Loading