Skip to content

Commit

Permalink
Merge pull request #417 from moka-rs/evict-more-entries-at-once
Browse files Browse the repository at this point in the history
Make a single call to `run_pending_tasks` to evict as many entries as possible from the cache
  • Loading branch information
tatsuya6502 authored Apr 16, 2024
2 parents 8ba2a0a + ccb285b commit c56d646
Show file tree
Hide file tree
Showing 13 changed files with 1,120 additions and 356 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Moka Cache — Change Log

## Version 0.12.7

### Changed

- Ensure a single call to `run_pending_tasks` to evict as many entries as possible
from the cache ([#417][gh-pull-0417]).


## Version 0.12.6

### Fixed
Expand Down Expand Up @@ -872,6 +880,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (Mar 25, 2021).
[gh-issue-0034]: https://github.com/moka-rs/moka/issues/34/
[gh-issue-0031]: https://github.com/moka-rs/moka/issues/31/

[gh-pull-0417]: https://github.com/moka-rs/moka/pull/417/
[gh-pull-0390]: https://github.com/moka-rs/moka/pull/390/
[gh-pull-0384]: https://github.com/moka-rs/moka/pull/384/
[gh-pull-0382]: https://github.com/moka-rs/moka/pull/382/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.12.6"
version = "0.12.7"
edition = "2021"
# Rust 1.65 was released on Nov 3, 2022.
rust-version = "1.65"
Expand Down
56 changes: 56 additions & 0 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

pub(crate) mod builder_utils;
pub(crate) mod concurrent;
pub(crate) mod deque;
Expand All @@ -10,6 +12,11 @@ pub(crate) mod timer_wheel;
#[cfg(test)]
pub(crate) mod test_utils;

use self::concurrent::constants::{
DEFAULT_EVICTION_BATCH_SIZE, DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
DEFAULT_MAX_LOG_SYNC_REPEATS,
};

// Note: `CacheRegion` cannot have more than four enum variants. This is because
// `crate::{sync,unsync}::DeqNodes` uses a `tagptr::TagNonNull<DeqNode<T>, 2>`
// pointer, where the 2-bit tag is `CacheRegion`.
Expand Down Expand Up @@ -56,6 +63,55 @@ impl PartialEq<usize> for CacheRegion {
}
}

#[derive(Clone, Debug)]
pub(crate) struct HousekeeperConfig {
/// The timeout duration for the `run_pending_tasks` method. This is a safe-guard
/// to prevent cache read/write operations (that may call `run_pending_tasks`
/// internally) from being blocked for a long time when the user wrote a slow
/// eviction listener closure.
///
/// Used only when the eviction listener closure is set for the cache instance.
///
/// Default: `DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS`
pub(crate) maintenance_task_timeout: Duration,
/// The maximum repeat count for receiving operation logs from the read and write
/// log channels. Default: `MAX_LOG_SYNC_REPEATS`.
pub(crate) max_log_sync_repeats: u32,
/// The batch size of entries to be processed by each internal eviction method.
/// Default: `EVICTION_BATCH_SIZE`.
pub(crate) eviction_batch_size: u32,
}

impl Default for HousekeeperConfig {
fn default() -> Self {
Self {
maintenance_task_timeout: Duration::from_millis(
DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
),
max_log_sync_repeats: DEFAULT_MAX_LOG_SYNC_REPEATS as u32,
eviction_batch_size: DEFAULT_EVICTION_BATCH_SIZE,
}
}
}

impl HousekeeperConfig {
#[cfg(test)]
pub(crate) fn new(
maintenance_task_timeout: Option<Duration>,
max_log_sync_repeats: Option<u32>,
eviction_batch_size: Option<u32>,
) -> Self {
Self {
maintenance_task_timeout: maintenance_task_timeout.unwrap_or(Duration::from_millis(
DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS,
)),
max_log_sync_repeats: max_log_sync_repeats
.unwrap_or(DEFAULT_MAX_LOG_SYNC_REPEATS as u32),
eviction_batch_size: eviction_batch_size.unwrap_or(DEFAULT_EVICTION_BATCH_SIZE),
}
}
}

// Ensures the value fits in a range of `128u32..=u32::MAX`.
pub(crate) fn sketch_capacity(max_capacity: u64) -> u32 {
max_capacity.try_into().unwrap_or(u32::MAX).max(128)
Expand Down
24 changes: 18 additions & 6 deletions src/common/concurrent/constants.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
pub(crate) const MAX_SYNC_REPEATS: usize = 4;
pub(crate) const PERIODICAL_SYNC_INITIAL_DELAY_MILLIS: u64 = 300;
pub(crate) const DEFAULT_MAX_LOG_SYNC_REPEATS: usize = 4;
pub(crate) const LOG_SYNC_INTERVAL_MILLIS: u64 = 300;

pub(crate) const READ_LOG_FLUSH_POINT: usize = 512;
pub(crate) const READ_LOG_SIZE: usize = READ_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);
pub(crate) const READ_LOG_FLUSH_POINT: usize = 64;
pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 64;

pub(crate) const WRITE_LOG_FLUSH_POINT: usize = 512;
pub(crate) const WRITE_LOG_SIZE: usize = WRITE_LOG_FLUSH_POINT * (MAX_SYNC_REPEATS + 2);
// 384 elements
pub(crate) const READ_LOG_CH_SIZE: usize =
READ_LOG_FLUSH_POINT * (DEFAULT_MAX_LOG_SYNC_REPEATS + 2);

// 384 elements
pub(crate) const WRITE_LOG_CH_SIZE: usize =
WRITE_LOG_FLUSH_POINT * (DEFAULT_MAX_LOG_SYNC_REPEATS + 2);

// TODO: Calculate the batch size based on the number of entries in the cache (or an
// estimated number of entries to evict)
pub(crate) const DEFAULT_EVICTION_BATCH_SIZE: u32 = WRITE_LOG_CH_SIZE as u32;

/// The default timeout duration for the `run_pending_tasks` method.
pub(crate) const DEFAULT_MAINTENANCE_TASK_TIMEOUT_MILLIS: u64 = 100;

#[cfg(feature = "sync")]
pub(crate) const WRITE_RETRY_INTERVAL_MICROS: u64 = 50;
76 changes: 66 additions & 10 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::constants::{MAX_SYNC_REPEATS, PERIODICAL_SYNC_INITIAL_DELAY_MILLIS};
use super::constants::LOG_SYNC_INTERVAL_MILLIS;

use super::{
atomic_time::AtomicInstant,
constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT},
};
use crate::common::time::{CheckedTimeOps, Instant};
use crate::common::HousekeeperConfig;

use parking_lot::{Mutex, MutexGuard};
use std::{
Expand All @@ -13,34 +14,85 @@ use std::{
};

pub(crate) trait InnerSync {
fn run_pending_tasks(&self, max_sync_repeats: usize);
/// Runs the pending tasks. Returns `true` if there are more entries to evict in
/// next run.
fn run_pending_tasks(
&self,
timeout: Option<Duration>,
max_log_sync_repeats: u32,
eviction_batch_size: u32,
) -> bool;

fn now(&self) -> Instant;
}

pub(crate) struct Housekeeper {
run_lock: Mutex<()>,
run_after: AtomicInstant,
/// A flag to indicate if the last call on `run_pending_tasks` method left some
/// entries to evict.
///
/// Used only when the eviction listener closure is set for this cache instance
/// because, if not, `run_pending_tasks` will never leave entries to evict.
more_entries_to_evict: Option<AtomicBool>,
/// The timeout duration for the `run_pending_tasks` method. This is a safe-guard
/// to prevent cache read/write operations (that may call `run_pending_tasks`
/// internally) from being blocked for a long time when the user wrote a slow
/// eviction listener closure.
///
/// Used only when the eviction listener closure is set for this cache instance.
maintenance_task_timeout: Option<Duration>,
/// The maximum repeat count for receiving operation logs from the read and write
/// log channels. Default: `MAX_LOG_SYNC_REPEATS`.
max_log_sync_repeats: u32,
/// The batch size of entries to be processed by each internal eviction method.
/// Default: `EVICTION_BATCH_SIZE`.
eviction_batch_size: u32,
auto_run_enabled: AtomicBool,
}

impl Default for Housekeeper {
fn default() -> Self {
impl Housekeeper {
pub(crate) fn new(is_eviction_listener_enabled: bool, config: HousekeeperConfig) -> Self {
let (more_entries_to_evict, maintenance_task_timeout) = if is_eviction_listener_enabled {
(
Some(AtomicBool::new(false)),
Some(config.maintenance_task_timeout),
)
} else {
(None, None)
};

Self {
run_lock: Mutex::default(),
run_after: AtomicInstant::new(Self::sync_after(Instant::now())),
more_entries_to_evict,
maintenance_task_timeout,
max_log_sync_repeats: config.max_log_sync_repeats,
eviction_batch_size: config.eviction_batch_size,
auto_run_enabled: AtomicBool::new(true),
}
}
}

impl Housekeeper {
pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now)
self.more_entries_to_evict() || self.should_apply(ch_len, READ_LOG_FLUSH_POINT, now)
}

pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now)
self.more_entries_to_evict() || self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT, now)
}

#[inline]
fn more_entries_to_evict(&self) -> bool {
self.more_entries_to_evict
.as_ref()
.map(|v| v.load(Ordering::Acquire))
.unwrap_or(false)
}

fn set_more_entries_to_evict(&self, v: bool) {
if let Some(flag) = &self.more_entries_to_evict {
flag.store(v, Ordering::Release);
}
}

#[inline]
Expand All @@ -66,11 +118,15 @@ impl Housekeeper {
fn do_run_pending_tasks<T: InnerSync>(&self, cache: &T, _lock: MutexGuard<'_, ()>) {
let now = cache.now();
self.run_after.set_instant(Self::sync_after(now));
cache.run_pending_tasks(MAX_SYNC_REPEATS);
let timeout = self.maintenance_task_timeout;
let repeats = self.max_log_sync_repeats;
let batch_size = self.eviction_batch_size;
let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size);
self.set_more_entries_to_evict(more_to_evict);
}

fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS);
let dur = Duration::from_millis(LOG_SYNC_INTERVAL_MILLIS);
let ts = now.checked_add(dur);
// Assuming that `now` is current wall clock time, this should never fail at
// least next millions of years.
Expand Down
Loading

0 comments on commit c56d646

Please sign in to comment.