Skip to content

Commit

Permalink
Merge pull request #165 from moka-rs/disable-housekeeper-threads
Browse files Browse the repository at this point in the history
Optionally disable thread pools in `sync::Cache`
  • Loading branch information
tatsuya6502 authored Aug 3, 2022
2 parents bf28c2b + 9f61c6b commit 7805070
Show file tree
Hide file tree
Showing 15 changed files with 605 additions and 130 deletions.
14 changes: 14 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ jobs:
# hashbrown >= v0.12 requires Rust 2021 edition.
# native-tls >= v0.2.9 requires more recent Rust version.
# async-global-executor >= 2.1 requires Rust 2021 edition.
# pull-down-cmark >= 0.9.2 requires Rust 2021 edition.
run: |
cargo update -p dashmap --precise 5.2.0
cargo update -p indexmap --precise 1.8.2
cargo update -p hashbrown --precise 0.11.2
cargo update -p native-tls --precise 0.2.8
cargo update -p async-global-executor --precise 2.0.4
cargo update -p pulldown-cmark --precise 0.9.1
- name: Show cargo tree
uses: actions-rs/cargo@v1
Expand All @@ -76,6 +78,18 @@ jobs:
command: test
args: --features sync

- name: Run tests (debug, sync feature, thread-pool test for sync::Cache)
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --features sync sync::cache::tests::enabling_and_disabling_thread_pools -- --exact --ignored

- name: Run tests (debug, sync feature, thread-pool test for sync::SegmentCache)
uses: actions-rs/cargo@v1
with:
command: test
args: --lib --features sync sync::segment::tests::enabling_and_disabling_thread_pools -- --exact --ignored

- name: Run tests (release, sync feature)
uses: actions-rs/cargo@v1
with:
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/CIQuantaDisabled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ jobs:
# hashbrown >= v0.12 requires Rust 2021 edition.
# native-tls >= v0.2.9 requires more recent Rust version.
# async-global-executor >= 2.1 requires Rust 2021 edition.
# pull-down-cmark >= 0.9.2 requires Rust 2021 edition.
run: |
cargo update -p dashmap --precise 5.2.0
cargo update -p indexmap --precise 1.8.2
cargo update -p hashbrown --precise 0.11.2
cargo update -p native-tls --precise 0.2.8
cargo update -p async-global-executor --precise 2.0.4
cargo update -p pulldown-cmark --precise 0.9.1
- name: Run tests (debug, but no quanta feature)
uses: actions-rs/cargo@v1
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## Version 0.9.3

### Added

- Add a configuration option to the following caches to avoid to start the global
thread pools ([#165][gh-pull-0165]):
- `sync::Cache`
- `sync::SegmentedCache`

### Fixed

- Ensure that the following caches will drop the value of evicted entries immediately
Expand Down Expand Up @@ -463,6 +470,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).

[gh-pull-0169]: https://github.com/moka-rs/moka/pull/169/
[gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/
[gh-pull-0165]: https://github.com/moka-rs/moka/pull/165/
[gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/
[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/
[gh-pull-0145]: https://github.com/moka-rs/moka/pull/145/
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ log = { version = "0.4", optional = true }

[dev-dependencies]
actix-rt = { version = "2.7", default-features = false }
anyhow = "1.0"
anyhow = "1.0.19"
async-std = { version = "1.11", features = ["attributes"] }
env_logger = "0.9"
getrandom = "0.2"
Expand Down
12 changes: 12 additions & 0 deletions src/common/builder_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::time::Duration;

#[cfg(any(feature = "sync", feature = "future"))]
use super::concurrent::housekeeper;

const YEAR_SECONDS: u64 = 365 * 24 * 3600;

pub(crate) fn ensure_expirations_or_panic(
Expand All @@ -14,3 +17,12 @@ pub(crate) fn ensure_expirations_or_panic(
assert!(d <= max_duration, "time_to_idle is longer than 1000 years");
}
}

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn housekeeper_conf(thread_pool_enabled: bool) -> housekeeper::Configuration {
if thread_pool_enabled {
housekeeper::Configuration::new_thread_pool(true)
} else {
housekeeper::Configuration::new_blocking()
}
}
211 changes: 189 additions & 22 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ use super::{
unsafe_weak_pointer::UnsafeWeakPointer,
};

#[cfg(any(feature = "sync", feature = "future"))]
use super::atomic_time::AtomicInstant;

#[cfg(any(feature = "sync", feature = "future"))]
use crate::common::time::{CheckedTimeOps, Instant};

#[cfg(any(feature = "sync", feature = "future"))]
use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};

use parking_lot::Mutex;
use scheduled_thread_pool::JobHandle;
use std::{
Expand All @@ -18,6 +27,155 @@ use std::{
time::Duration,
};

pub(crate) trait InnerSync {
fn sync(&self, max_sync_repeats: usize) -> Option<SyncPace>;

#[cfg(any(feature = "sync", feature = "future"))]
fn now(&self) -> Instant;
}

#[derive(Clone, Debug)]
pub(crate) struct Configuration {
is_blocking: bool,
periodical_sync_enabled: bool,
}

impl Configuration {
#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn new_blocking() -> Self {
Self {
is_blocking: true,
periodical_sync_enabled: false,
}
}

pub(crate) fn new_thread_pool(periodical_sync_enabled: bool) -> Self {
Self {
is_blocking: false,
periodical_sync_enabled,
}
}
}

pub(crate) enum Housekeeper<T> {
Blocking(BlockingHousekeeper),
ThreadPool(ThreadPoolHousekeeper<T>),
}

impl<T> Housekeeper<T>
where
T: InnerSync + 'static,
{
pub(crate) fn new(inner: Weak<T>, config: Configuration) -> Self {
if config.is_blocking {
Housekeeper::Blocking(BlockingHousekeeper::default())
} else {
Housekeeper::ThreadPool(ThreadPoolHousekeeper::new(
inner,
config.periodical_sync_enabled,
))
}
}

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
match self {
Housekeeper::Blocking(h) => h.should_apply_reads(ch_len, now),
Housekeeper::ThreadPool(h) => h.should_apply_reads(ch_len, now),
}
}

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
match self {
Housekeeper::Blocking(h) => h.should_apply_writes(ch_len, now),
Housekeeper::ThreadPool(h) => h.should_apply_writes(ch_len, now),
}
}

pub(crate) fn try_sync(&self, cache: &impl InnerSync) -> bool {
match self {
Housekeeper::Blocking(h) => h.try_sync(cache),
Housekeeper::ThreadPool(h) => h.try_schedule_sync(),
}
}

#[cfg(test)]
pub(crate) fn stop_periodical_sync_job(&self) {
match self {
Housekeeper::Blocking(_) => (),
Housekeeper::ThreadPool(h) => h.stop_periodical_sync_job(),
}
}
}

pub(crate) struct BlockingHousekeeper {
is_sync_running: AtomicBool,
#[cfg(any(feature = "sync", feature = "future"))]
sync_after: AtomicInstant,
}

impl Default for BlockingHousekeeper {
fn default() -> Self {
Self {
is_sync_running: Default::default(),
#[cfg(any(feature = "sync", feature = "future"))]
sync_after: AtomicInstant::new(Self::sync_after(Instant::now())),
}
}
}

impl BlockingHousekeeper {
#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now)
}

#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool {
self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now)
}

#[cfg(any(feature = "sync", feature = "future"))]
#[inline]
fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool {
ch_len >= ch_flush_point || self.sync_after.instant().unwrap() >= now
}

fn try_sync<T: InnerSync>(&self, cache: &T) -> bool {
// Try to flip the value of sync_scheduled from false to true.
match self.is_sync_running.compare_exchange(
false,
true,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
#[cfg(any(feature = "sync", feature = "future"))]
{
let now = cache.now();
self.sync_after.set_instant(Self::sync_after(now));
}

cache.sync(MAX_SYNC_REPEATS);

self.is_sync_running.store(false, Ordering::Release);
true
}
Err(_) => false,
}
}

#[cfg(any(feature = "sync", feature = "future"))]
fn sync_after(now: Instant) -> Instant {
let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS);
let ts = now.checked_add(dur);
// Assuming that `now` is current wall clock time, this should never fail at
// least next millions of years.
ts.expect("Timestamp overflow")
}
}

#[derive(PartialEq, Eq)]
pub(crate) enum SyncPace {
Normal,
Expand All @@ -34,11 +192,7 @@ impl SyncPace {
}
}

pub(crate) trait InnerSync {
fn sync(&self, max_sync_repeats: usize) -> Option<SyncPace>;
}

pub(crate) struct Housekeeper<T> {
pub(crate) struct ThreadPoolHousekeeper<T> {
inner: Arc<Mutex<UnsafeWeakPointer<T>>>,
thread_pool: Arc<ThreadPool>,
is_shutting_down: Arc<AtomicBool>,
Expand All @@ -48,7 +202,7 @@ pub(crate) struct Housekeeper<T> {
_marker: PhantomData<T>,
}

impl<T> Drop for Housekeeper<T> {
impl<T> Drop for ThreadPoolHousekeeper<T> {
fn drop(&mut self) {
// Disallow to create and/or run sync jobs by now.
self.is_shutting_down.store(true, Ordering::Release);
Expand Down Expand Up @@ -78,30 +232,34 @@ impl<T> Drop for Housekeeper<T> {
}

// functions/methods used by Cache
impl<T: InnerSync> Housekeeper<T>
impl<T> ThreadPoolHousekeeper<T>
where
T: 'static,
T: InnerSync + 'static,
{
pub(crate) fn new(inner: Weak<T>) -> Self {
fn new(inner: Weak<T>, periodical_sync_enable: bool) -> Self {
use super::thread_pool::PoolName;

let thread_pool = ThreadPoolRegistry::acquire_pool(PoolName::Housekeeper);
let inner_ptr = Arc::new(Mutex::new(UnsafeWeakPointer::from_weak_arc(inner)));
let is_shutting_down = Arc::new(AtomicBool::new(false));
let periodical_sync_running = Arc::new(Mutex::new(()));

let sync_job = Self::start_periodical_sync_job(
&thread_pool,
Arc::clone(&inner_ptr),
Arc::clone(&is_shutting_down),
Arc::clone(&periodical_sync_running),
);
let maybe_sync_job = if periodical_sync_enable {
Some(Self::start_periodical_sync_job(
&thread_pool,
Arc::clone(&inner_ptr),
Arc::clone(&is_shutting_down),
Arc::clone(&periodical_sync_running),
))
} else {
None
};

Self {
inner: inner_ptr,
thread_pool,
is_shutting_down,
periodical_sync_job: Mutex::new(Some(sync_job)),
periodical_sync_job: Mutex::new(maybe_sync_job),
periodical_sync_running,
on_demand_sync_scheduled: Arc::new(AtomicBool::new(false)),
_marker: PhantomData::default(),
Expand Down Expand Up @@ -139,9 +297,17 @@ where
.execute_with_dynamic_delay(initial_delay, housekeeper_closure)
}

pub(crate) fn try_schedule_sync(&self) -> bool {
// TODO: Check if these `Orderings` are correct.
#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_reads(&self, ch_len: usize, _now: Instant) -> bool {
ch_len >= READ_LOG_FLUSH_POINT
}

#[cfg(any(feature = "sync", feature = "future"))]
fn should_apply_writes(&self, ch_len: usize, _now: Instant) -> bool {
ch_len >= WRITE_LOG_FLUSH_POINT
}

fn try_schedule_sync(&self) -> bool {
// If shutting down, do not schedule the task.
if self.is_shutting_down.load(Ordering::Acquire) {
return false;
Expand Down Expand Up @@ -169,13 +335,14 @@ where
}

#[cfg(test)]
pub(crate) fn periodical_sync_job(&self) -> &Mutex<Option<JobHandle>> {
&self.periodical_sync_job
pub(crate) fn stop_periodical_sync_job(&self) {
if let Some(j) = self.periodical_sync_job.lock().take() {
j.cancel();
}
}
}

// private functions/methods
impl<T: InnerSync> Housekeeper<T> {
impl<T: InnerSync> ThreadPoolHousekeeper<T> {
fn call_sync(unsafe_weak_ptr: &Arc<Mutex<UnsafeWeakPointer<T>>>) -> Option<SyncPace> {
let lock = unsafe_weak_ptr.lock();
// Restore the Weak pointer to Inner<K, V, S>.
Expand Down
Loading

0 comments on commit 7805070

Please sign in to comment.