From 89d99128476ec7648c1cd2188d4ce0de9ae35deb Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 9 Jul 2022 14:43:56 +0800 Subject: [PATCH 01/15] Optionally disable thread pools in `sync::Cache` - Rename current thread-pool based `Housekeeper` to `ThreadPoolHousekeeper`. - Add `BlockingHousekeeper`. - Add `thread_pool_enabled` method to `sync::CacheBuilder`. --- src/common/builder_utils.rs | 12 +++ src/common/concurrent/housekeeper.rs | 132 ++++++++++++++++++++++----- src/common/concurrent/thread_pool.rs | 28 ++++-- src/dash/base_cache.rs | 39 ++++---- src/dash/cache.rs | 9 +- src/future/builder.rs | 2 + src/future/cache.rs | 43 +++++---- src/sync/builder.rs | 20 ++++ src/sync/cache.rs | 35 ++++--- src/sync/segment.rs | 7 +- src/sync_base/base_cache.rs | 27 +++--- 11 files changed, 258 insertions(+), 96 deletions(-) diff --git a/src/common/builder_utils.rs b/src/common/builder_utils.rs index 4db23a61..1ddb1a82 100644 --- a/src/common/builder_utils.rs +++ b/src/common/builder_utils.rs @@ -1,5 +1,8 @@ use std::time::Duration; +#[cfg(any(feature = "sync", feature = "future"))] +use super::concurrent::housekeeper; + const YEAR_SECONDS: u64 = 365 * 24 * 3600; pub(crate) fn ensure_expirations_or_panic( @@ -14,3 +17,12 @@ pub(crate) fn ensure_expirations_or_panic( assert!(d <= max_duration, "time_to_idle is longer than 1000 years"); } } + +#[cfg(any(feature = "sync", feature = "future"))] +pub(crate) fn housekeeper_conf(thread_pool_enabled: bool) -> housekeeper::Configuration { + if thread_pool_enabled { + housekeeper::Configuration::new_thread_pool(true) + } else { + housekeeper::Configuration::new_blocking() + } +} diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index f9d88a30..c62341dd 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -18,6 +18,93 @@ use std::{ time::Duration, }; +pub(crate) trait InnerSync { + fn sync(&self, max_sync_repeats: usize) -> Option; +} + +#[derive(Clone, Debug)] +pub(crate) struct Configuration { + is_blocking: bool, + periodical_sync_enabled: bool, +} + +impl Configuration { + #[cfg(any(feature = "sync", feature = "future"))] + pub(crate) fn new_blocking() -> Self { + Self { + is_blocking: true, + periodical_sync_enabled: false, + } + } + + pub(crate) fn new_thread_pool(periodical_sync_enable: bool) -> Self { + Self { + is_blocking: false, + periodical_sync_enabled: periodical_sync_enable, + } + } +} + +pub(crate) enum Housekeeper { + Blocking(BlockingHousekeeper), + ThreadPool(ThreadPoolHousekeeper), +} + +impl Housekeeper +where + T: InnerSync + 'static, +{ + pub(crate) fn new(inner: Weak, config: Configuration) -> Self { + if config.is_blocking { + Housekeeper::Blocking(BlockingHousekeeper::default()) + } else { + Housekeeper::ThreadPool(ThreadPoolHousekeeper::new( + inner, + config.periodical_sync_enabled, + )) + } + } + + pub(crate) fn try_sync(&self, cache: &impl InnerSync) -> bool { + match self { + Housekeeper::Blocking(h) => h.try_sync(cache), + Housekeeper::ThreadPool(h) => h.try_schedule_sync(), + } + } + + #[cfg(test)] + pub(crate) fn stop_periodical_sync_job(&self) { + match self { + Housekeeper::Blocking(_) => (), + Housekeeper::ThreadPool(h) => h.stop_periodical_sync_job(), + } + } +} + +#[derive(Default)] +pub(crate) struct BlockingHousekeeper { + is_sync_running: AtomicBool, +} + +impl BlockingHousekeeper { + fn try_sync(&self, cache: &T) -> bool { + // Try to flip the value of sync_scheduled from false to true. + match self.is_sync_running.compare_exchange( + false, + true, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => { + cache.sync(MAX_SYNC_REPEATS); + self.is_sync_running.store(false, Ordering::Release); + true + } + Err(_) => false, + } + } +} + #[derive(PartialEq, Eq)] pub(crate) enum SyncPace { Normal, @@ -34,11 +121,7 @@ impl SyncPace { } } -pub(crate) trait InnerSync { - fn sync(&self, max_sync_repeats: usize) -> Option; -} - -pub(crate) struct Housekeeper { +pub(crate) struct ThreadPoolHousekeeper { inner: Arc>>, thread_pool: Arc, is_shutting_down: Arc, @@ -48,7 +131,7 @@ pub(crate) struct Housekeeper { _marker: PhantomData, } -impl Drop for Housekeeper { +impl Drop for ThreadPoolHousekeeper { fn drop(&mut self) { // Disallow to create and/or run sync jobs by now. self.is_shutting_down.store(true, Ordering::Release); @@ -78,11 +161,11 @@ impl Drop for Housekeeper { } // functions/methods used by Cache -impl Housekeeper +impl ThreadPoolHousekeeper where - T: 'static, + T: InnerSync + 'static, { - pub(crate) fn new(inner: Weak) -> Self { + fn new(inner: Weak, periodical_sync_enable: bool) -> Self { use super::thread_pool::PoolName; let thread_pool = ThreadPoolRegistry::acquire_pool(PoolName::Housekeeper); @@ -90,18 +173,22 @@ where let is_shutting_down = Arc::new(AtomicBool::new(false)); let periodical_sync_running = Arc::new(Mutex::new(())); - let sync_job = Self::start_periodical_sync_job( - &thread_pool, - Arc::clone(&inner_ptr), - Arc::clone(&is_shutting_down), - Arc::clone(&periodical_sync_running), - ); + let maybe_sync_job = if periodical_sync_enable { + Some(Self::start_periodical_sync_job( + &thread_pool, + Arc::clone(&inner_ptr), + Arc::clone(&is_shutting_down), + Arc::clone(&periodical_sync_running), + )) + } else { + None + }; Self { inner: inner_ptr, thread_pool, is_shutting_down, - periodical_sync_job: Mutex::new(Some(sync_job)), + periodical_sync_job: Mutex::new(maybe_sync_job), periodical_sync_running, on_demand_sync_scheduled: Arc::new(AtomicBool::new(false)), _marker: PhantomData::default(), @@ -139,9 +226,7 @@ where .execute_with_dynamic_delay(initial_delay, housekeeper_closure) } - pub(crate) fn try_schedule_sync(&self) -> bool { - // TODO: Check if these `Orderings` are correct. - + fn try_schedule_sync(&self) -> bool { // If shutting down, do not schedule the task. if self.is_shutting_down.load(Ordering::Acquire) { return false; @@ -169,13 +254,14 @@ where } #[cfg(test)] - pub(crate) fn periodical_sync_job(&self) -> &Mutex> { - &self.periodical_sync_job + pub(crate) fn stop_periodical_sync_job(&self) { + if let Some(j) = self.periodical_sync_job.lock().take() { + j.cancel(); + } } } -// private functions/methods -impl Housekeeper { +impl ThreadPoolHousekeeper { fn call_sync(unsafe_weak_ptr: &Arc>>) -> Option { let lock = unsafe_weak_ptr.lock(); // Restore the Weak pointer to Inner. diff --git a/src/common/concurrent/thread_pool.rs b/src/common/concurrent/thread_pool.rs index f900a15f..d652ff10 100644 --- a/src/common/concurrent/thread_pool.rs +++ b/src/common/concurrent/thread_pool.rs @@ -33,6 +33,24 @@ pub(crate) struct ThreadPool { // pub(crate) num_threads: usize, } +impl ThreadPool { + fn new(name: PoolName, num_threads: usize) -> Self { + // println!("Created pool: {:?}", name); + let pool = ScheduledThreadPool::with_name(name.thread_name_template(), num_threads); + Self { + name, + pool, + // num_threads, + } + } +} + +// impl Drop for ThreadPool { +// fn drop(&mut self) { +// println!("Dropped pool: {:?}", self.name) +// } +// } + pub(crate) struct ThreadPoolRegistry { pools: RwLock>>, } @@ -64,14 +82,8 @@ impl ThreadPoolRegistry { // https://github.com/moka-rs/moka/pull/39#issuecomment-916888859 // https://github.com/seanmonstar/num_cpus/issues/69 let num_threads = num_cpus::get().max(1); - let pool = - ScheduledThreadPool::with_name(name.thread_name_template(), num_threads); - let t_pool = ThreadPool { - name, - pool, - // num_threads, - }; - Arc::new(t_pool) + let pool = ThreadPool::new(name, num_threads); + Arc::new(pool) }); } } diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 624fe637..bd04c2b5 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -10,7 +10,7 @@ use crate::{ }, deques::Deques, entry_info::EntryInfo, - housekeeper::{Housekeeper, InnerSync, SyncPace}, + housekeeper::{self, Housekeeper, InnerSync, SyncPace}, AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp, }, @@ -111,7 +111,8 @@ where time_to_live, time_to_idle, )); - let housekeeper = Housekeeper::new(Arc::downgrade(&inner)); + let h_cfg = housekeeper::Configuration::new_thread_pool(true); + let housekeeper = Housekeeper::new(Arc::downgrade(&inner), h_cfg); Self { inner, read_op_ch: r_snd, @@ -166,19 +167,24 @@ where let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); let now = i.current_time_from_expiration_clock(); - let entry = &*entry; + let arc_entry = &*entry; - if is_expired_entry_wo(ttl, va, entry, now) - || is_expired_entry_ao(tti, va, entry, now) + if is_expired_entry_wo(ttl, va, arc_entry, now) + || is_expired_entry_ao(tti, va, arc_entry, now) { + // Drop the entry to avoid to deadlock with record_read_op. + std::mem::drop(entry); // Expired or invalidated entry. Record this access as a cache miss // rather than a hit. record(ReadOp::Miss(hash)); None } else { // Valid entry. - let v = entry.value.clone(); - record(ReadOp::Hit(hash, TrioArc::clone(entry), now)); + let v = arc_entry.value.clone(); + let e = TrioArc::clone(arc_entry); + // Drop the entry to avoid to deadlock with record_read_op. + std::mem::drop(entry); + record(ReadOp::Hit(hash, e, now)); Some(v) } } @@ -196,6 +202,7 @@ where #[inline] pub(crate) fn apply_reads_writes_if_needed( + inner: &impl InnerSync, ch: &Sender>, housekeeper: Option<&HouseKeeperArc>, ) { @@ -203,7 +210,7 @@ where if Self::should_apply_writes(w_len) { if let Some(h) = housekeeper { - h.try_schedule_sync(); + h.try_sync(inner); } } } @@ -246,7 +253,7 @@ where { #[inline] fn record_read_op(&self, op: ReadOp) -> Result<(), TrySendError>> { - self.apply_reads_if_needed(); + self.apply_reads_if_needed(self.inner.as_ref()); let ch = &self.read_op_ch; match ch.try_send(op) { // Discard the ReadOp when the channel is full. @@ -330,24 +337,24 @@ where } #[inline] - fn apply_reads_if_needed(&self) { + fn apply_reads_if_needed(&self, inner: &impl InnerSync) { let len = self.read_op_ch.len(); if Self::should_apply_reads(len) { if let Some(h) = &self.housekeeper { - h.try_schedule_sync(); + h.try_sync(inner); } } } #[inline] fn should_apply_reads(ch_len: usize) -> bool { - ch_len >= READ_LOG_FLUSH_POINT + ch_len >= READ_LOG_FLUSH_POINT / 8 } #[inline] fn should_apply_writes(ch_len: usize) -> bool { - ch_len >= WRITE_LOG_FLUSH_POINT + ch_len >= WRITE_LOG_FLUSH_POINT / 8 } } @@ -364,11 +371,7 @@ where pub(crate) fn reconfigure_for_testing(&mut self) { // Stop the housekeeping job that may cause sync() method to return earlier. if let Some(housekeeper) = &self.housekeeper { - // TODO: Extract this into a housekeeper method. - let mut job = housekeeper.periodical_sync_job().lock(); - if let Some(job) = job.take() { - job.cancel(); - } + housekeeper.stop_periodical_sync_job(); } // Enable the frequency sketch. self.inner.enable_frequency_sketch_for_testing(); diff --git a/src/dash/cache.rs b/src/dash/cache.rs index 2d7832f1..9f4ee022 100644 --- a/src/dash/cache.rs +++ b/src/dash/cache.rs @@ -450,7 +450,8 @@ where pub(crate) fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { let op = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to insert"); + Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) + .expect("Failed to insert"); } /// Discards any cached value for the key. @@ -465,7 +466,8 @@ where if let Some(kv) = self.base.remove_entry(key) { let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to remove"); + Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) + .expect("Failed to remove"); } } @@ -563,6 +565,7 @@ where { #[inline] fn schedule_write_op( + inner: &impl InnerSync, ch: &Sender>, op: WriteOp, housekeeper: Option<&HouseKeeperArc>, @@ -574,7 +577,7 @@ where // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`, // but we got a notable performance degradation. loop { - BaseCache::apply_reads_writes_if_needed(ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { diff --git a/src/future/builder.rs b/src/future/builder.rs index fcb536f5..452d32bd 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -122,6 +122,7 @@ where self.time_to_live, self.time_to_idle, self.invalidator_enabled, + builder_utils::housekeeper_conf(true), ) } @@ -148,6 +149,7 @@ where self.time_to_live, self.time_to_idle, self.invalidator_enabled, + builder_utils::housekeeper_conf(true), ) } } diff --git a/src/future/cache.rs b/src/future/cache.rs index 3defc6df..f0e42753 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -5,7 +5,7 @@ use super::{ use crate::{ common::concurrent::{ constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, - housekeeper::InnerSync, + housekeeper::{self, InnerSync}, Weigher, WriteOp, }, notification::{self, EvictionListener}, @@ -660,6 +660,7 @@ where None, None, false, + housekeeper::Configuration::new_thread_pool(true), ) } @@ -691,6 +692,7 @@ where time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, + housekeeper_conf: housekeeper::Configuration, ) -> Self { Self { base: BaseCache::new( @@ -704,6 +706,7 @@ where time_to_live, time_to_idle, invalidator_enabled, + housekeeper_conf, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), } @@ -834,10 +837,10 @@ where /// /// # Panics /// - /// This method panics when the `init` future has been panicked. When it happens, - /// only the caller whose `init` future panicked will get the panic (e.g. only - /// task 3 in the above sample). If there are other calls in progress (e.g. task - /// 0, 1 and 2 above), this method will restart and resolve one of the remaining + /// This method panics when the `init` future has panicked. When it happens, only + /// the caller whose `init` future panicked will get the panic (e.g. only task 3 + /// in the above sample). If there are other calls in progress (e.g. task 0, 1 + /// and 2 above), this method will restart and resolve one of the remaining /// `init` futures. /// pub async fn get_with(&self, key: K, init: impl Future) -> V { @@ -952,10 +955,10 @@ where /// /// # Panics /// - /// This method panics when the `init` future has been panicked. When it happens, - /// only the caller whose `init` future panicked will get the panic (e.g. only - /// task 2 in the above sample). If there are other calls in progress (e.g. task - /// 0, 1 and 3 above), this method will restart and resolve one of the remaining + /// This method panics when the `init` future has panicked. When it happens, only + /// the caller whose `init` future panicked will get the panic (e.g. only task 2 + /// in the above sample). If there are other calls in progress (e.g. task 0, 1 + /// and 3 above), this method will restart and resolve one of the remaining /// `init` futures. /// pub async fn try_get_with(&self, key: K, init: F) -> Result> @@ -983,7 +986,8 @@ where let key = Arc::new(key); let op = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::blocking_schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to insert"); + Self::blocking_schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) + .expect("Failed to insert"); } /// Discards any cached value for the key. @@ -1002,7 +1006,7 @@ where } let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(&self.base.write_op_ch, op, hk) + Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) .await .expect("Failed to remove"); } @@ -1017,8 +1021,13 @@ where if let Some(kv) = self.base.remove_entry(key, hash) { let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); - Self::blocking_schedule_write_op(&self.base.write_op_ch, op, hk) - .expect("Failed to remove"); + Self::blocking_schedule_write_op( + self.base.inner.as_ref(), + &self.base.write_op_ch, + op, + hk, + ) + .expect("Failed to remove"); } } @@ -1243,13 +1252,14 @@ where async fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { let op = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(&self.base.write_op_ch, op, hk) + Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) .await .expect("Failed to insert"); } #[inline] async fn schedule_write_op( + inner: &impl InnerSync, ch: &Sender>, op: WriteOp, housekeeper: Option<&HouseKeeperArc>, @@ -1259,7 +1269,7 @@ where // TODO: Try to replace the timer with an async event listener to see if it // can provide better performance. loop { - BaseCache::apply_reads_writes_if_needed(ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { @@ -1275,6 +1285,7 @@ where #[inline] fn blocking_schedule_write_op( + inner: &impl InnerSync, ch: &Sender>, op: WriteOp, housekeeper: Option<&HouseKeeperArc>, @@ -1282,7 +1293,7 @@ where let mut op = op; loop { - BaseCache::apply_reads_writes_if_needed(ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { diff --git a/src/sync/builder.rs b/src/sync/builder.rs index 1c708c6d..638e6659 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -56,6 +56,7 @@ pub struct CacheBuilder { time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, + thread_pool_enabled: bool, cache_type: PhantomData, } @@ -76,6 +77,8 @@ where time_to_live: None, time_to_idle: None, invalidator_enabled: false, + // TODO: Change this to `false` in Moka 0.10.0. + thread_pool_enabled: true, cache_type: Default::default(), } } @@ -117,6 +120,7 @@ where time_to_live: self.time_to_live, time_to_idle: self.time_to_idle, invalidator_enabled: self.invalidator_enabled, + thread_pool_enabled: self.thread_pool_enabled, cache_type: PhantomData::default(), } } @@ -145,6 +149,7 @@ where self.time_to_live, self.time_to_idle, self.invalidator_enabled, + builder_utils::housekeeper_conf(self.thread_pool_enabled), ) } @@ -174,6 +179,7 @@ where self.time_to_live, self.time_to_idle, self.invalidator_enabled, + builder_utils::housekeeper_conf(self.thread_pool_enabled), ) } } @@ -208,6 +214,7 @@ where self.time_to_live, self.time_to_idle, self.invalidator_enabled, + builder_utils::housekeeper_conf(self.thread_pool_enabled), ) } @@ -238,6 +245,7 @@ where self.time_to_live, self.time_to_idle, self.invalidator_enabled, + builder_utils::housekeeper_conf(true), ) } } @@ -380,6 +388,18 @@ impl CacheBuilder { ..self } } + + /// Specify whether or not to enable thread pool for housekeeping tasks, such as + /// ... `true` to enable and `false` to disable. + /// + /// The thread pool is enabled by default in current version of Moka but the + /// default will be changed to disabled in future version. + pub fn thread_pool_enabled(self, v: bool) -> Self { + Self { + thread_pool_enabled: v, + ..self + } + } } #[cfg(test)] diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 13a90adc..68c83ff4 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -5,7 +5,7 @@ use super::{ use crate::{ common::concurrent::{ constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, - housekeeper::InnerSync, + housekeeper::{self, InnerSync}, Weigher, WriteOp, }, notification::{self, EvictionListener}, @@ -803,6 +803,7 @@ where /// [builder-struct]: ./struct.CacheBuilder.html pub fn new(max_capacity: u64) -> Self { let build_hasher = RandomState::default(); + let housekeeper_conf = housekeeper::Configuration::new_thread_pool(true); Self::with_everything( None, Some(max_capacity), @@ -814,6 +815,7 @@ where None, None, false, + housekeeper_conf, ) } @@ -845,6 +847,7 @@ where time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, + housekeeper_conf: housekeeper::Configuration, ) -> Self { Self { base: BaseCache::new( @@ -858,6 +861,7 @@ where time_to_live, time_to_idle, invalidator_enabled, + housekeeper_conf, ), value_initializer: Arc::new(ValueInitializer::with_hasher(build_hasher)), } @@ -996,11 +1000,11 @@ where /// /// # Panics /// - /// This method panics when the `init` closure has been panicked. When it - /// happens, only the caller whose `init` closure panicked will get the panic - /// (e.g. only thread 1 in the above sample). If there are other calls in - /// progress (e.g. thread 0, 2 and 3 above), this method will restart and resolve - /// one of the remaining `init` closure. + /// This method panics when the `init` closure has panicked. When it happens, + /// only the caller whose `init` closure panicked will get the panic (e.g. only + /// thread 1 in the above sample). If there are other calls in progress (e.g. + /// thread 0, 2 and 3 above), this method will restart and resolve one of the + /// remaining `init` closure. /// pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V { let hash = self.base.hash(&key); @@ -1134,11 +1138,11 @@ where /// /// # Panics /// - /// This method panics when the `init` closure has been panicked. When it - /// happens, only the caller whose `init` closure panicked will get the panic - /// (e.g. only thread 1 in the above sample). If there are other calls in - /// progress (e.g. thread 0, 2 and 3 above), this method will restart and resolve - /// one of the remaining `init` closure. + /// This method panics when the `init` closure has panicked. When it happens, + /// only the caller whose `init` closure panicked will get the panic (e.g. only + /// thread 1 in the above sample). If there are other calls in progress (e.g. + /// thread 0, 2 and 3 above), this method will restart and resolve one of the + /// remaining `init` closure. /// pub fn try_get_with(&self, key: K, init: F) -> Result> where @@ -1191,7 +1195,8 @@ where pub(crate) fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { let op = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to insert"); + Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) + .expect("Failed to insert"); } /// Discards any cached value for the key. @@ -1244,7 +1249,8 @@ where let op = WriteOp::Remove(kv); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to remove"); + Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) + .expect("Failed to remove"); } } @@ -1415,6 +1421,7 @@ where { #[inline] fn schedule_write_op( + inner: &impl InnerSync, ch: &Sender>, op: WriteOp, housekeeper: Option<&HouseKeeperArc>, @@ -1426,7 +1433,7 @@ where // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`, // but we got a notable performance degradation. loop { - BaseCache::apply_reads_writes_if_needed(ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 1aa984a8..86510cd5 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1,6 +1,6 @@ use super::{cache::Cache, CacheBuilder, ConcurrentCacheExt}; use crate::{ - common::concurrent::Weigher, + common::concurrent::{housekeeper, Weigher}, notification::{self, EvictionListener}, sync_base::iter::{Iter, ScanningGet}, Policy, PredicateError, @@ -107,6 +107,7 @@ where None, None, false, + housekeeper::Configuration::new_thread_pool(true), ) } @@ -216,6 +217,7 @@ where time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, + housekeeper_conf: housekeeper::Configuration, ) -> Self { Self { inner: Arc::new(Inner::new( @@ -230,6 +232,7 @@ where time_to_live, time_to_idle, invalidator_enabled, + housekeeper_conf, )), } } @@ -597,6 +600,7 @@ where time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, + housekeeper_conf: housekeeper::Configuration, ) -> Self { assert!(num_segments > 0); @@ -620,6 +624,7 @@ where time_to_live, time_to_idle, invalidator_enabled, + housekeeper_conf.clone(), ) }) .collect::>(); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index b659bde4..9fea802e 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -16,7 +16,7 @@ use crate::{ }, deques::Deques, entry_info::EntryInfo, - housekeeper::{Housekeeper, InnerSync, SyncPace}, + housekeeper::{self, Housekeeper, InnerSync, SyncPace}, AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, ReadOp, ValueEntry, Weigher, WriteOp, }, @@ -156,6 +156,7 @@ where time_to_live: Option, time_to_idle: Option, invalidator_enabled: bool, + housekeeper_conf: housekeeper::Configuration, ) -> Self { let (r_snd, r_rcv) = crossbeam_channel::bounded(READ_LOG_SIZE); let (w_snd, w_rcv) = crossbeam_channel::bounded(WRITE_LOG_SIZE); @@ -176,7 +177,7 @@ where if invalidator_enabled { inner.set_invalidator(&inner); } - let housekeeper = Housekeeper::new(Arc::downgrade(&inner)); + let housekeeper = Housekeeper::new(Arc::downgrade(&inner), housekeeper_conf); Self { inner, read_op_ch: r_snd, @@ -270,6 +271,7 @@ where #[inline] pub(crate) fn apply_reads_writes_if_needed( + inner: &impl InnerSync, ch: &Sender>, housekeeper: Option<&HouseKeeperArc>, ) { @@ -277,7 +279,7 @@ where if Self::should_apply_writes(w_len) { if let Some(h) = housekeeper { - h.try_schedule_sync(); + h.try_sync(inner); } } } @@ -345,7 +347,7 @@ where { #[inline] fn record_read_op(&self, op: ReadOp) -> Result<(), TrySendError>> { - self.apply_reads_if_needed(); + self.apply_reads_if_needed(&self.inner); let ch = &self.read_op_ch; match ch.try_send(op) { // Discard the ReadOp when the channel is full. @@ -482,24 +484,24 @@ where } #[inline] - fn apply_reads_if_needed(&self) { + fn apply_reads_if_needed(&self, inner: &Inner) { let len = self.read_op_ch.len(); if Self::should_apply_reads(len) { if let Some(h) = &self.housekeeper { - h.try_schedule_sync(); + h.try_sync(inner); } } } #[inline] fn should_apply_reads(ch_len: usize) -> bool { - ch_len >= READ_LOG_FLUSH_POINT + ch_len >= READ_LOG_FLUSH_POINT / 8 } #[inline] fn should_apply_writes(ch_len: usize) -> bool { - ch_len >= WRITE_LOG_FLUSH_POINT + ch_len >= WRITE_LOG_FLUSH_POINT / 8 } } @@ -520,11 +522,7 @@ where pub(crate) fn reconfigure_for_testing(&mut self) { // Stop the housekeeping job that may cause sync() method to return earlier. if let Some(housekeeper) = &self.housekeeper { - // TODO: Extract this into a housekeeper method. - let mut job = housekeeper.periodical_sync_job().lock(); - if let Some(job) = job.take() { - job.cancel(); - } + housekeeper.stop_periodical_sync_job(); } // Enable the frequency sketch. self.inner.enable_frequency_sketch_for_testing(); @@ -2017,6 +2015,8 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { + use crate::common::concurrent::housekeeper; + use super::BaseCache; #[cfg_attr(target_pointer_width = "16", ignore)] @@ -2039,6 +2039,7 @@ mod tests { None, None, false, + housekeeper::Configuration::new_thread_pool(true), ); cache.inner.enable_frequency_sketch_for_testing(); assert_eq!( From bb38d45bb9b97cbe5676905a1d94af62ba8f10e1 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 23 Jul 2022 17:40:37 +0800 Subject: [PATCH 02/15] Optionally disable thread pools in `sync::Cache` Update `BlockingHousekeeper` to trigger applying read and/or write operations not only when the channel len becomes long enough but also when a certain amount of time has passed. --- src/common/concurrent/housekeeper.rs | 76 ++++++++++++++++++++++++++-- src/sync_base/base_cache.rs | 20 ++++---- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index c62341dd..6af77e97 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -1,6 +1,9 @@ +use crate::common::time::{CheckedTimeOps, Instant}; + +use super::atomic_time::AtomicInstant; use super::constants::{ MAX_SYNC_REPEATS, PERIODICAL_SYNC_FAST_PACE_NANOS, PERIODICAL_SYNC_INITIAL_DELAY_MILLIS, - PERIODICAL_SYNC_NORMAL_PACE_MILLIS, + PERIODICAL_SYNC_NORMAL_PACE_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT, }; use super::{ thread_pool::{ThreadPool, ThreadPoolRegistry}, @@ -37,10 +40,10 @@ impl Configuration { } } - pub(crate) fn new_thread_pool(periodical_sync_enable: bool) -> Self { + pub(crate) fn new_thread_pool(periodical_sync_enabled: bool) -> Self { Self { is_blocking: false, - periodical_sync_enabled: periodical_sync_enable, + periodical_sync_enabled, } } } @@ -65,6 +68,20 @@ where } } + pub(crate) fn should_apply_reads(&self, ch_len: usize) -> bool { + match self { + Housekeeper::Blocking(h) => h.should_apply_reads(ch_len), + Housekeeper::ThreadPool(h) => h.should_apply_reads(ch_len), + } + } + + pub(crate) fn should_apply_writes(&self, ch_len: usize) -> bool { + match self { + Housekeeper::Blocking(h) => h.should_apply_writes(ch_len), + Housekeeper::ThreadPool(h) => h.should_apply_writes(ch_len), + } + } + pub(crate) fn try_sync(&self, cache: &impl InnerSync) -> bool { match self { Housekeeper::Blocking(h) => h.try_sync(cache), @@ -81,12 +98,47 @@ where } } -#[derive(Default)] pub(crate) struct BlockingHousekeeper { is_sync_running: AtomicBool, + sync_after: AtomicInstant, +} + +impl Default for BlockingHousekeeper { + fn default() -> Self { + Self { + is_sync_running: Default::default(), + sync_after: AtomicInstant::new(Self::sync_after(Instant::now())), + } + } } impl BlockingHousekeeper { + // NOTE: This method may update the `sync_after` field. + fn should_apply_reads(&self, ch_len: usize) -> bool { + self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8) + } + + // NOTE: This method may update the `sync_after` field. + fn should_apply_writes(&self, ch_len: usize) -> bool { + self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8) + } + + // NOTE: This method may update the `sync_after` field. + #[inline] + fn should_apply(&self, ch_len: usize, ch_flush_point: usize) -> bool { + if ch_len >= ch_flush_point { + true + } else { + let now = Instant::now(); + if self.sync_after.instant().unwrap() >= now { + self.sync_after.set_instant(Self::sync_after(now)); + true + } else { + false + } + } + } + fn try_sync(&self, cache: &T) -> bool { // Try to flip the value of sync_scheduled from false to true. match self.is_sync_running.compare_exchange( @@ -103,6 +155,14 @@ impl BlockingHousekeeper { Err(_) => false, } } + + fn sync_after(now: Instant) -> Instant { + let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS); + let ts = now.checked_add(dur); + // Assuming that `now` is current wall clock time, this should never fail at + // least next millions of years. + ts.expect("Timestamp overflow") + } } #[derive(PartialEq, Eq)] @@ -226,6 +286,14 @@ where .execute_with_dynamic_delay(initial_delay, housekeeper_closure) } + fn should_apply_reads(&self, ch_len: usize) -> bool { + ch_len >= READ_LOG_FLUSH_POINT + } + + fn should_apply_writes(&self, ch_len: usize) -> bool { + ch_len >= WRITE_LOG_FLUSH_POINT + } + fn try_schedule_sync(&self) -> bool { // If shutting down, do not schedule the task. if self.is_shutting_down.load(Ordering::Acquire) { diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 5f28d900..241c5231 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -277,9 +277,9 @@ where ) { let w_len = ch.len(); - if Self::should_apply_writes(w_len) { - if let Some(h) = housekeeper { - h.try_sync(inner); + if let Some(hk) = housekeeper { + if Self::should_apply_writes(hk, w_len) { + hk.try_sync(inner); } } } @@ -487,21 +487,21 @@ where fn apply_reads_if_needed(&self, inner: &Inner) { let len = self.read_op_ch.len(); - if Self::should_apply_reads(len) { - if let Some(h) = &self.housekeeper { - h.try_sync(inner); + if let Some(hk) = &self.housekeeper { + if Self::should_apply_reads(hk, len) { + hk.try_sync(inner); } } } #[inline] - fn should_apply_reads(ch_len: usize) -> bool { - ch_len >= READ_LOG_FLUSH_POINT / 8 + fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize) -> bool { + hk.should_apply_reads(ch_len) } #[inline] - fn should_apply_writes(ch_len: usize) -> bool { - ch_len >= WRITE_LOG_FLUSH_POINT / 8 + fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize) -> bool { + hk.should_apply_writes(ch_len) } } From c30b85707373a493425d57b28fd47803d429f976 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 23 Jul 2022 20:06:17 +0800 Subject: [PATCH 03/15] Optionally disable thread pools in `sync::Cache` Add `#[cfg(..)]` to some methods in `housekeeper` module to prevent compiler warnings for unused methods. --- src/common/concurrent/housekeeper.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 6af77e97..036a87a2 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -1,15 +1,21 @@ -use crate::common::time::{CheckedTimeOps, Instant}; - -use super::atomic_time::AtomicInstant; use super::constants::{ MAX_SYNC_REPEATS, PERIODICAL_SYNC_FAST_PACE_NANOS, PERIODICAL_SYNC_INITIAL_DELAY_MILLIS, - PERIODICAL_SYNC_NORMAL_PACE_MILLIS, READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT, + PERIODICAL_SYNC_NORMAL_PACE_MILLIS, }; use super::{ thread_pool::{ThreadPool, ThreadPoolRegistry}, unsafe_weak_pointer::UnsafeWeakPointer, }; +#[cfg(any(feature = "sync", feature = "future"))] +use super::atomic_time::AtomicInstant; + +#[cfg(any(feature = "sync", feature = "future"))] +use crate::common::time::{CheckedTimeOps, Instant}; + +#[cfg(any(feature = "sync", feature = "future"))] +use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT}; + use parking_lot::Mutex; use scheduled_thread_pool::JobHandle; use std::{ @@ -68,6 +74,7 @@ where } } + #[cfg(any(feature = "sync", feature = "future"))] pub(crate) fn should_apply_reads(&self, ch_len: usize) -> bool { match self { Housekeeper::Blocking(h) => h.should_apply_reads(ch_len), @@ -75,6 +82,7 @@ where } } + #[cfg(any(feature = "sync", feature = "future"))] pub(crate) fn should_apply_writes(&self, ch_len: usize) -> bool { match self { Housekeeper::Blocking(h) => h.should_apply_writes(ch_len), @@ -100,6 +108,7 @@ where pub(crate) struct BlockingHousekeeper { is_sync_running: AtomicBool, + #[cfg(any(feature = "sync", feature = "future"))] sync_after: AtomicInstant, } @@ -107,22 +116,26 @@ impl Default for BlockingHousekeeper { fn default() -> Self { Self { is_sync_running: Default::default(), + #[cfg(any(feature = "sync", feature = "future"))] sync_after: AtomicInstant::new(Self::sync_after(Instant::now())), } } } impl BlockingHousekeeper { + #[cfg(any(feature = "sync", feature = "future"))] // NOTE: This method may update the `sync_after` field. fn should_apply_reads(&self, ch_len: usize) -> bool { self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8) } + #[cfg(any(feature = "sync", feature = "future"))] // NOTE: This method may update the `sync_after` field. fn should_apply_writes(&self, ch_len: usize) -> bool { self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8) } + #[cfg(any(feature = "sync", feature = "future"))] // NOTE: This method may update the `sync_after` field. #[inline] fn should_apply(&self, ch_len: usize, ch_flush_point: usize) -> bool { @@ -156,6 +169,7 @@ impl BlockingHousekeeper { } } + #[cfg(any(feature = "sync", feature = "future"))] fn sync_after(now: Instant) -> Instant { let dur = Duration::from_millis(PERIODICAL_SYNC_INITIAL_DELAY_MILLIS); let ts = now.checked_add(dur); @@ -286,10 +300,12 @@ where .execute_with_dynamic_delay(initial_delay, housekeeper_closure) } + #[cfg(any(feature = "sync", feature = "future"))] fn should_apply_reads(&self, ch_len: usize) -> bool { ch_len >= READ_LOG_FLUSH_POINT } + #[cfg(any(feature = "sync", feature = "future"))] fn should_apply_writes(&self, ch_len: usize) -> bool { ch_len >= WRITE_LOG_FLUSH_POINT } From bca9293ecde4cca323f0dd1557ff5a29faa11d04 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Tue, 26 Jul 2022 00:41:59 +0000 Subject: [PATCH 04/15] Optionally disable thread pools in `sync::Cache` Try not to get the current time twice in a write path. --- src/common/concurrent/housekeeper.rs | 37 ++++++++-------- src/future/cache.rs | 58 ++++++++++++++++++------- src/sync/cache.rs | 37 +++++++++++----- src/sync_base/base_cache.rs | 65 ++++++++++++++++++---------- 4 files changed, 127 insertions(+), 70 deletions(-) diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 036a87a2..abfd90c8 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -75,18 +75,18 @@ where } #[cfg(any(feature = "sync", feature = "future"))] - pub(crate) fn should_apply_reads(&self, ch_len: usize) -> bool { + pub(crate) fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool { match self { - Housekeeper::Blocking(h) => h.should_apply_reads(ch_len), - Housekeeper::ThreadPool(h) => h.should_apply_reads(ch_len), + Housekeeper::Blocking(h) => h.should_apply_reads(ch_len, now), + Housekeeper::ThreadPool(h) => h.should_apply_reads(ch_len, now), } } #[cfg(any(feature = "sync", feature = "future"))] - pub(crate) fn should_apply_writes(&self, ch_len: usize) -> bool { + pub(crate) fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool { match self { - Housekeeper::Blocking(h) => h.should_apply_writes(ch_len), - Housekeeper::ThreadPool(h) => h.should_apply_writes(ch_len), + Housekeeper::Blocking(h) => h.should_apply_writes(ch_len, now), + Housekeeper::ThreadPool(h) => h.should_apply_writes(ch_len, now), } } @@ -125,30 +125,27 @@ impl Default for BlockingHousekeeper { impl BlockingHousekeeper { #[cfg(any(feature = "sync", feature = "future"))] // NOTE: This method may update the `sync_after` field. - fn should_apply_reads(&self, ch_len: usize) -> bool { - self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8) + fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool { + self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now) } #[cfg(any(feature = "sync", feature = "future"))] // NOTE: This method may update the `sync_after` field. - fn should_apply_writes(&self, ch_len: usize) -> bool { - self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8) + fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool { + self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now) } #[cfg(any(feature = "sync", feature = "future"))] // NOTE: This method may update the `sync_after` field. #[inline] - fn should_apply(&self, ch_len: usize, ch_flush_point: usize) -> bool { + fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool { if ch_len >= ch_flush_point { true + } else if self.sync_after.instant().unwrap() >= now { + self.sync_after.set_instant(Self::sync_after(now)); + true } else { - let now = Instant::now(); - if self.sync_after.instant().unwrap() >= now { - self.sync_after.set_instant(Self::sync_after(now)); - true - } else { - false - } + false } } @@ -301,12 +298,12 @@ where } #[cfg(any(feature = "sync", feature = "future"))] - fn should_apply_reads(&self, ch_len: usize) -> bool { + fn should_apply_reads(&self, ch_len: usize, _now: Instant) -> bool { ch_len >= READ_LOG_FLUSH_POINT } #[cfg(any(feature = "sync", feature = "future"))] - fn should_apply_writes(&self, ch_len: usize) -> bool { + fn should_apply_writes(&self, ch_len: usize, _now: Instant) -> bool { ch_len >= WRITE_LOG_FLUSH_POINT } diff --git a/src/future/cache.rs b/src/future/cache.rs index 8df20100..adec9889 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -3,10 +3,13 @@ use super::{ CacheBuilder, ConcurrentCacheExt, Iter, PredicateId, }; use crate::{ - common::concurrent::{ - constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, - housekeeper::{self, InnerSync}, - Weigher, WriteOp, + common::{ + concurrent::{ + constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, + housekeeper::{self, InnerSync}, + Weigher, WriteOp, + }, + time::Instant, }, notification::{self, EvictionListener}, sync_base::base_cache::{BaseCache, HouseKeeperArc}, @@ -984,10 +987,16 @@ where fn do_blocking_insert(&self, key: K, value: V) { let hash = self.base.hash(&key); let key = Arc::new(key); - let op = self.base.do_insert_with_hash(key, hash, value); + let (op, now) = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::blocking_schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) - .expect("Failed to insert"); + Self::blocking_schedule_write_op( + self.base.inner.as_ref(), + &self.base.write_op_ch, + op, + now, + hk, + ) + .expect("Failed to insert"); } /// Discards any cached value for the key. @@ -1005,10 +1014,17 @@ where self.base.notify_invalidate(&kv.key, &kv.entry) } let op = WriteOp::Remove(kv); + let now = self.base.current_time_from_expiration_clock(); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) - .await - .expect("Failed to remove"); + Self::schedule_write_op( + self.base.inner.as_ref(), + &self.base.write_op_ch, + op, + now, + hk, + ) + .await + .expect("Failed to remove"); crossbeam_epoch::pin().flush(); } } @@ -1021,11 +1037,13 @@ where let hash = self.base.hash(key); if let Some(kv) = self.base.remove_entry(key, hash) { let op = WriteOp::Remove(kv); + let now = self.base.current_time_from_expiration_clock(); let hk = self.base.housekeeper.as_ref(); Self::blocking_schedule_write_op( self.base.inner.as_ref(), &self.base.write_op_ch, op, + now, hk, ) .expect("Failed to remove"); @@ -1256,11 +1274,17 @@ where } async fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { - let op = self.base.do_insert_with_hash(key, hash, value); + let (op, now) = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) - .await - .expect("Failed to insert"); + Self::schedule_write_op( + self.base.inner.as_ref(), + &self.base.write_op_ch, + op, + now, + hk, + ) + .await + .expect("Failed to insert"); } #[inline] @@ -1268,6 +1292,7 @@ where inner: &impl InnerSync, ch: &Sender>, op: WriteOp, + now: Instant, housekeeper: Option<&HouseKeeperArc>, ) -> Result<(), TrySendError>> { let mut op = op; @@ -1275,7 +1300,7 @@ where // TODO: Try to replace the timer with an async event listener to see if it // can provide better performance. loop { - BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, now, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { @@ -1294,12 +1319,13 @@ where inner: &impl InnerSync, ch: &Sender>, op: WriteOp, + now: Instant, housekeeper: Option<&HouseKeeperArc>, ) -> Result<(), TrySendError>> { let mut op = op; loop { - BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, now, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 1d1a9092..29b38d8c 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -3,10 +3,13 @@ use super::{ CacheBuilder, ConcurrentCacheExt, }; use crate::{ - common::concurrent::{ - constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, - housekeeper::{self, InnerSync}, - Weigher, WriteOp, + common::{ + concurrent::{ + constants::{MAX_SYNC_REPEATS, WRITE_RETRY_INTERVAL_MICROS}, + housekeeper::{self, InnerSync}, + Weigher, WriteOp, + }, + time::Instant, }, notification::{self, EvictionListener}, sync::{Iter, PredicateId}, @@ -1198,10 +1201,16 @@ where } pub(crate) fn insert_with_hash(&self, key: Arc, hash: u64, value: V) { - let op = self.base.do_insert_with_hash(key, hash, value); + let (op, now) = self.base.do_insert_with_hash(key, hash, value); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) - .expect("Failed to insert"); + Self::schedule_write_op( + self.base.inner.as_ref(), + &self.base.write_op_ch, + op, + now, + hk, + ) + .expect("Failed to insert"); } /// Discards any cached value for the key. @@ -1253,9 +1262,16 @@ where std::mem::drop(kl); let op = WriteOp::Remove(kv); + let now = self.base.current_time_from_expiration_clock(); let hk = self.base.housekeeper.as_ref(); - Self::schedule_write_op(self.base.inner.as_ref(), &self.base.write_op_ch, op, hk) - .expect("Failed to remove"); + Self::schedule_write_op( + self.base.inner.as_ref(), + &self.base.write_op_ch, + op, + now, + hk, + ) + .expect("Failed to remove"); crossbeam_epoch::pin().flush(); } } @@ -1430,6 +1446,7 @@ where inner: &impl InnerSync, ch: &Sender>, op: WriteOp, + now: Instant, housekeeper: Option<&HouseKeeperArc>, ) -> Result<(), TrySendError>> { let mut op = op; @@ -1439,7 +1456,7 @@ where // - We are doing a busy-loop here. We were originally calling `ch.send(op)?`, // but we got a notable performance degradation. loop { - BaseCache::apply_reads_writes_if_needed(inner, ch, housekeeper); + BaseCache::apply_reads_writes_if_needed(inner, ch, now, housekeeper); match ch.try_send(op) { Ok(()) => break, Err(TrySendError::Full(op1)) => { diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 9f1a1c4b..daa30d58 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -113,6 +113,11 @@ impl BaseCache { self.inner.is_blocking_removal_notification() } + #[inline] + pub(crate) fn current_time_from_expiration_clock(&self) -> Instant { + self.inner.current_time_from_expiration_clock() + } + pub(crate) fn notify_invalidate(&self, key: &Arc, entry: &TrioArc>) where K: Send + Sync + 'static, @@ -204,7 +209,7 @@ where .get_key_value_and(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = i.current_time_from_expiration_clock(); + let now = self.current_time_from_expiration_clock(); !is_expired_entry_wo(ttl, va, entry, now) && !is_expired_entry_ao(tti, va, entry, now) @@ -219,14 +224,15 @@ where Q: Hash + Eq + ?Sized, { // Define a closure to record a read op. - let record = |op| { - self.record_read_op(op).expect("Failed to record a get op"); + let record = |op, now| { + self.record_read_op(op, now) + .expect("Failed to record a get op"); }; let maybe_entry = self.inner.get_key_value_and_then(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = i.current_time_from_expiration_clock(); + let now = self.current_time_from_expiration_clock(); if is_expired_entry_wo(ttl, va, entry, now) || is_expired_entry_ao(tti, va, entry, now) @@ -242,10 +248,11 @@ where if let Some((entry, now)) = maybe_entry { let v = entry.value.clone(); - record(ReadOp::Hit(hash, entry, now)); + record(ReadOp::Hit(hash, entry, now), now); Some(v) } else { - record(ReadOp::Miss(hash)); + let now = self.current_time_from_expiration_clock(); + record(ReadOp::Miss(hash), now); None } } @@ -273,19 +280,20 @@ where pub(crate) fn apply_reads_writes_if_needed( inner: &impl InnerSync, ch: &Sender>, + now: Instant, housekeeper: Option<&HouseKeeperArc>, ) { let w_len = ch.len(); if let Some(hk) = housekeeper { - if Self::should_apply_writes(hk, w_len) { + if Self::should_apply_writes(hk, w_len, now) { hk.try_sync(inner); } } } pub(crate) fn invalidate_all(&self) { - let now = self.inner.current_time_from_expiration_clock(); + let now = self.current_time_from_expiration_clock(); self.inner.set_valid_after(now); } @@ -293,7 +301,7 @@ where &self, predicate: PredicateFun, ) -> Result { - let now = self.inner.current_time_from_expiration_clock(); + let now = self.current_time_from_expiration_clock(); self.inner.register_invalidation_predicate(predicate, now) } } @@ -316,7 +324,7 @@ where self.inner.get_key_value_and_then(key, hash, |k, entry| { let i = &self.inner; let (ttl, tti, va) = (&i.time_to_live(), &i.time_to_idle(), &i.valid_after()); - let now = i.current_time_from_expiration_clock(); + let now = self.current_time_from_expiration_clock(); if is_expired_entry_wo(ttl, va, entry, now) || is_expired_entry_ao(tti, va, entry, now) @@ -346,8 +354,12 @@ where S: BuildHasher + Clone + Send + Sync + 'static, { #[inline] - fn record_read_op(&self, op: ReadOp) -> Result<(), TrySendError>> { - self.apply_reads_if_needed(&self.inner); + fn record_read_op( + &self, + op: ReadOp, + now: Instant, + ) -> Result<(), TrySendError>> { + self.apply_reads_if_needed(&self.inner, now); let ch = &self.read_op_ch; match ch.try_send(op) { // Discard the ReadOp when the channel is full. @@ -357,8 +369,13 @@ where } #[inline] - pub(crate) fn do_insert_with_hash(&self, key: Arc, hash: u64, value: V) -> WriteOp { - let ts = self.inner.current_time_from_expiration_clock(); + pub(crate) fn do_insert_with_hash( + &self, + key: Arc, + hash: u64, + value: V, + ) -> (WriteOp, Instant) { + let ts = self.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); let op_cnt1 = Rc::new(AtomicU8::new(0)); let op_cnt2 = Rc::clone(&op_cnt1); @@ -422,7 +439,7 @@ where ); match (op1, op2) { - (Some((_cnt, ins_op)), None) => ins_op, + (Some((_cnt, ins_op)), None) => (ins_op, ts), (None, Some((_cnt, old_entry, (old_last_accessed, old_last_modified), upd_op))) => { old_entry.unset_q_nodes(); if self.is_removal_notifier_enabled() { @@ -430,14 +447,14 @@ where .notify_upsert(key, &old_entry, old_last_accessed, old_last_modified); } crossbeam_epoch::pin().flush(); - upd_op + (upd_op, ts) } ( Some((cnt1, ins_op)), Some((cnt2, old_entry, (old_last_accessed, old_last_modified), upd_op)), ) => { if cnt1 > cnt2 { - ins_op + (ins_op, ts) } else { old_entry.unset_q_nodes(); if self.is_removal_notifier_enabled() { @@ -449,7 +466,7 @@ where ); } crossbeam_epoch::pin().flush(); - upd_op + (upd_op, ts) } } (None, None) => unreachable!(), @@ -486,24 +503,24 @@ where } #[inline] - fn apply_reads_if_needed(&self, inner: &Inner) { + fn apply_reads_if_needed(&self, inner: &Inner, now: Instant) { let len = self.read_op_ch.len(); if let Some(hk) = &self.housekeeper { - if Self::should_apply_reads(hk, len) { + if Self::should_apply_reads(hk, len, now) { hk.try_sync(inner); } } } #[inline] - fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize) -> bool { - hk.should_apply_reads(ch_len) + fn should_apply_reads(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + hk.should_apply_reads(ch_len, now) } #[inline] - fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize) -> bool { - hk.should_apply_writes(ch_len) + fn should_apply_writes(hk: &HouseKeeperArc, ch_len: usize, now: Instant) -> bool { + hk.should_apply_writes(ch_len, now) } } From 59ff5a0408a1d5a53b2097a38b2f31dc7b22a402 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Wed, 27 Jul 2022 08:39:11 +0800 Subject: [PATCH 05/15] Optionally disable thread pools in `sync::Cache` - Fix the time-interval handling in `BlockingHousekeeper`. - Add test cases to ensure the thread pools are disabled. --- src/common/concurrent/housekeeper.rs | 18 +++---- src/common/concurrent/thread_pool.rs | 9 +++- src/dash/base_cache.rs | 4 ++ src/sync/cache.rs | 64 +++++++++++++++++++++++++ src/sync/segment.rs | 70 ++++++++++++++++++++++++++++ src/sync_base/base_cache.rs | 4 ++ 6 files changed, 157 insertions(+), 12 deletions(-) diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index abfd90c8..5bc75470 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -29,6 +29,8 @@ use std::{ pub(crate) trait InnerSync { fn sync(&self, max_sync_repeats: usize) -> Option; + + fn now(&self) -> Instant; } #[derive(Clone, Debug)] @@ -124,29 +126,19 @@ impl Default for BlockingHousekeeper { impl BlockingHousekeeper { #[cfg(any(feature = "sync", feature = "future"))] - // NOTE: This method may update the `sync_after` field. fn should_apply_reads(&self, ch_len: usize, now: Instant) -> bool { self.should_apply(ch_len, READ_LOG_FLUSH_POINT / 8, now) } #[cfg(any(feature = "sync", feature = "future"))] - // NOTE: This method may update the `sync_after` field. fn should_apply_writes(&self, ch_len: usize, now: Instant) -> bool { self.should_apply(ch_len, WRITE_LOG_FLUSH_POINT / 8, now) } #[cfg(any(feature = "sync", feature = "future"))] - // NOTE: This method may update the `sync_after` field. #[inline] fn should_apply(&self, ch_len: usize, ch_flush_point: usize, now: Instant) -> bool { - if ch_len >= ch_flush_point { - true - } else if self.sync_after.instant().unwrap() >= now { - self.sync_after.set_instant(Self::sync_after(now)); - true - } else { - false - } + ch_len >= ch_flush_point || self.sync_after.instant().unwrap() >= now } fn try_sync(&self, cache: &T) -> bool { @@ -158,7 +150,11 @@ impl BlockingHousekeeper { Ordering::Relaxed, ) { Ok(_) => { + let now = cache.now(); + self.sync_after.set_instant(Self::sync_after(now)); + cache.sync(MAX_SYNC_REPEATS); + self.is_sync_running.store(false, Ordering::Release); true } diff --git a/src/common/concurrent/thread_pool.rs b/src/common/concurrent/thread_pool.rs index d652ff10..ace02f26 100644 --- a/src/common/concurrent/thread_pool.rs +++ b/src/common/concurrent/thread_pool.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, sync::Arc}; static REGISTRY: Lazy = Lazy::new(ThreadPoolRegistry::default); -#[derive(Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] #[cfg_attr(any(feature = "sync", feature = "future"), derive(Debug))] pub(crate) enum PoolName { Housekeeper, @@ -102,4 +102,11 @@ impl ThreadPoolRegistry { } } } + + #[cfg(all(test, feature = "sync"))] + pub(crate) fn enabled_pools() -> Vec { + let mut names: Vec<_> = REGISTRY.pools.read().keys().cloned().collect(); + names.sort_unstable(); + names + } } diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 9b3f16ed..4d8c6213 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -705,6 +705,10 @@ where None } } + + fn now(&self) -> Instant { + unreachable!() + } } // diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 29b38d8c..0d96b7e3 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -3061,6 +3061,70 @@ mod tests { assert_eq!(counters.value_dropped(), KEYS, "value_dropped"); } + // Ignored by default. This test cannot run in parallel with other tests. + #[test] + #[ignore] + fn enabled_thread_pools() { + use crate::common::concurrent::thread_pool::{PoolName::*, ThreadPoolRegistry}; + + // Enable the housekeeper pool. + { + let cache = Cache::builder().thread_pool_enabled(true).build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper]); + } + + // Enable the housekeeper and invalidator pools. + { + let cache = Cache::builder() + .thread_pool_enabled(true) + .support_invalidation_closures() + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper, Invalidator]); + } + + // Queued delivery mode: Enable the housekeeper and removal notifier pools. + { + let listener = |_k, _v, _cause| {}; + let listener_conf = notification::Configuration::builder() + .delivery_mode(DeliveryMode::Queued) + .build(); + let cache = Cache::builder() + .thread_pool_enabled(true) + .eviction_listener_with_conf(listener, listener_conf) + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper, RemovalNotifier]); + } + + // Immediate delivery mode: Enable only the housekeeper pool. + { + let listener = |_k, _v, _cause| {}; + let listener_conf = notification::Configuration::builder() + .delivery_mode(DeliveryMode::Immediate) + .build(); + let cache = Cache::builder() + .thread_pool_enabled(true) + .eviction_listener_with_conf(listener, listener_conf) + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper]); + } + + // Disable all pools. + { + let cache = Cache::builder().thread_pool_enabled(false).build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert!(enabled_pools.is_empty()); + } + } + #[test] fn test_debug_format() { let cache = Cache::new(10); diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 7341de83..9208e7af 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1668,6 +1668,76 @@ mod tests { assert_eq!(counters.value_dropped(), KEYS, "value_dropped"); } + // Ignored by default. This test cannot run in parallel with other tests. + #[test] + #[ignore] + fn enabled_thread_pools() { + use crate::common::concurrent::thread_pool::{PoolName::*, ThreadPoolRegistry}; + + const NUM_SEGMENTS: usize = 4; + + // Enable the housekeeper pool. + { + let cache = SegmentedCache::builder(NUM_SEGMENTS) + .thread_pool_enabled(true) + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper]); + } + + // Enable the housekeeper and invalidator pools. + { + let cache = SegmentedCache::builder(NUM_SEGMENTS) + .thread_pool_enabled(true) + .support_invalidation_closures() + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper, Invalidator]); + } + + // Queued delivery mode: Enable the housekeeper and removal notifier pools. + { + let listener = |_k, _v, _cause| {}; + let listener_conf = notification::Configuration::builder() + .delivery_mode(DeliveryMode::Queued) + .build(); + let cache = SegmentedCache::builder(NUM_SEGMENTS) + .thread_pool_enabled(true) + .eviction_listener_with_conf(listener, listener_conf) + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper, RemovalNotifier]); + } + + // Immediate delivery mode: Enable only the housekeeper pool. + { + let listener = |_k, _v, _cause| {}; + let listener_conf = notification::Configuration::builder() + .delivery_mode(DeliveryMode::Immediate) + .build(); + let cache = SegmentedCache::builder(NUM_SEGMENTS) + .thread_pool_enabled(true) + .eviction_listener_with_conf(listener, listener_conf) + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert_eq!(enabled_pools, &[Housekeeper]); + } + + // Disable all pools. + { + let cache = SegmentedCache::builder(NUM_SEGMENTS) + .thread_pool_enabled(false) + .build(); + cache.insert('a', "a"); + let enabled_pools = ThreadPoolRegistry::enabled_pools(); + assert!(enabled_pools.is_empty()); + } + } + #[test] fn test_debug_format() { let cache = SegmentedCache::new(10, 4); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index daa30d58..2b7f7013 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -1116,6 +1116,10 @@ where None } } + + fn now(&self) -> Instant { + self.current_time_from_expiration_clock() + } } // From e16ffea065bbab367da452071303c033d79062af Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Wed, 27 Jul 2022 19:48:18 +0800 Subject: [PATCH 06/15] Optionally disable thread pools in `sync::Cache` Fix compile errors when `default-features` is specified. --- src/common/concurrent/housekeeper.rs | 8 ++++++-- src/dash/base_cache.rs | 1 + src/sync_base/base_cache.rs | 1 + 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/common/concurrent/housekeeper.rs b/src/common/concurrent/housekeeper.rs index 5bc75470..778123ff 100644 --- a/src/common/concurrent/housekeeper.rs +++ b/src/common/concurrent/housekeeper.rs @@ -30,6 +30,7 @@ use std::{ pub(crate) trait InnerSync { fn sync(&self, max_sync_repeats: usize) -> Option; + #[cfg(any(feature = "sync", feature = "future"))] fn now(&self) -> Instant; } @@ -150,8 +151,11 @@ impl BlockingHousekeeper { Ordering::Relaxed, ) { Ok(_) => { - let now = cache.now(); - self.sync_after.set_instant(Self::sync_after(now)); + #[cfg(any(feature = "sync", feature = "future"))] + { + let now = cache.now(); + self.sync_after.set_instant(Self::sync_after(now)); + } cache.sync(MAX_SYNC_REPEATS); diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 4d8c6213..4fc316dd 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -706,6 +706,7 @@ where } } + #[cfg(any(feature = "sync", feature = "future"))] fn now(&self) -> Instant { unreachable!() } diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 2b7f7013..6794d218 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -1117,6 +1117,7 @@ where } } + #[cfg(any(feature = "sync", feature = "future"))] fn now(&self) -> Instant { self.current_time_from_expiration_clock() } From 7723c63e509b98eb916f537938c19fb3febcc383 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Wed, 27 Jul 2022 20:42:59 +0800 Subject: [PATCH 07/15] CI: Pin pulldown-cmark to v0.9.1 for testing with Rust 1.51.0 pulldown-cmark v0.9.2 requires Rust 2021 edition. --- .github/workflows/CI.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 23d21112..a3b754a2 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -57,12 +57,14 @@ jobs: # hashbrown >= v0.12 requires Rust 2021 edition. # native-tls >= v0.2.9 requires more recent Rust version. # async-global-executor >= 2.1 requires Rust 2021 edition. + # pull-down-cmark >= 0.9.2 requires Rust 2021 edition. run: | cargo update -p dashmap --precise 5.2.0 cargo update -p indexmap --precise 1.8.2 cargo update -p hashbrown --precise 0.11.2 cargo update -p native-tls --precise 0.2.8 cargo update -p async-global-executor --precise 2.0.4 + cargo update -p pulldown-cmark --precise 0.9.1 - name: Show cargo tree uses: actions-rs/cargo@v1 From 2d028f6990dd877ee536856b7ec4a48fc585a8b7 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Wed, 27 Jul 2022 20:47:58 +0800 Subject: [PATCH 08/15] CI: Pin pulldown-cmark to v0.9.1 for testing with Rust 1.51.0 pulldown-cmark v0.9.2 requires Rust 2021 edition. --- .github/workflows/CIQuantaDisabled.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/CIQuantaDisabled.yml b/.github/workflows/CIQuantaDisabled.yml index c59a7a37..add1adef 100644 --- a/.github/workflows/CIQuantaDisabled.yml +++ b/.github/workflows/CIQuantaDisabled.yml @@ -57,12 +57,14 @@ jobs: # hashbrown >= v0.12 requires Rust 2021 edition. # native-tls >= v0.2.9 requires more recent Rust version. # async-global-executor >= 2.1 requires Rust 2021 edition. + # pull-down-cmark >= 0.9.2 requires Rust 2021 edition. run: | cargo update -p dashmap --precise 5.2.0 cargo update -p indexmap --precise 1.8.2 cargo update -p hashbrown --precise 0.11.2 cargo update -p native-tls --precise 0.2.8 cargo update -p async-global-executor --precise 2.0.4 + cargo update -p pulldown-cmark --precise 0.9.1 - name: Run tests (debug, but no quanta feature) uses: actions-rs/cargo@v1 From 8c3b7db329c57b821d0a426baa4f8e468fecbff9 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 30 Jul 2022 17:31:09 +0800 Subject: [PATCH 09/15] Update the change log --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3840a6e0..023ce40a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ ## Version 0.9.3 +### Added + +- Add a configuration option to the following caches to avoid to start the global + thread pools ([#165][gh-pull-0165]): + - `sync::Cache` + - `sync::SegmentedCache` + ### Fixed - Ensure that the following caches will drop the value of evicted entries immediately @@ -463,6 +470,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25). [gh-pull-0169]: https://github.com/moka-rs/moka/pull/169/ [gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/ +[gh-pull-0165]: https://github.com/moka-rs/moka/pull/165/ [gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/ [gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/ [gh-pull-0145]: https://github.com/moka-rs/moka/pull/145/ From 72b2407c27eb8c16c08a148bef341a561f4dd08f Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 30 Jul 2022 17:41:42 +0800 Subject: [PATCH 10/15] Optionally disable thread pools in `sync::Cache` Remove the debugging codes that were already commented out. --- src/common/concurrent/thread_pool.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/common/concurrent/thread_pool.rs b/src/common/concurrent/thread_pool.rs index ace02f26..02f4c38c 100644 --- a/src/common/concurrent/thread_pool.rs +++ b/src/common/concurrent/thread_pool.rs @@ -35,7 +35,6 @@ pub(crate) struct ThreadPool { impl ThreadPool { fn new(name: PoolName, num_threads: usize) -> Self { - // println!("Created pool: {:?}", name); let pool = ScheduledThreadPool::with_name(name.thread_name_template(), num_threads); Self { name, @@ -45,12 +44,6 @@ impl ThreadPool { } } -// impl Drop for ThreadPool { -// fn drop(&mut self) { -// println!("Dropped pool: {:?}", self.name) -// } -// } - pub(crate) struct ThreadPoolRegistry { pools: RwLock>>, } From 683673a222e7bd87cd86409890caf9bde6303afc Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 30 Jul 2022 17:51:39 +0800 Subject: [PATCH 11/15] Optionally disable thread pools in `sync::Cache` Revert unnecessary changes in `dash::base_cache` module. --- src/dash/base_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 4fc316dd..c4a7eab2 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -349,12 +349,12 @@ where #[inline] fn should_apply_reads(ch_len: usize) -> bool { - ch_len >= READ_LOG_FLUSH_POINT / 8 + ch_len >= READ_LOG_FLUSH_POINT } #[inline] fn should_apply_writes(ch_len: usize) -> bool { - ch_len >= WRITE_LOG_FLUSH_POINT / 8 + ch_len >= WRITE_LOG_FLUSH_POINT } } From 4ac7d8c45dd4dcc47c3edcbb640ea0cf8d9930a1 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Wed, 3 Aug 2022 22:43:06 +0800 Subject: [PATCH 12/15] Optionally disable thread pools in `sync::Cache` Run tests on CI to check if enabling/disabling the thread pools work as expected. --- .github/workflows/CI.yml | 12 ++++++++++++ src/sync/cache.rs | 2 +- src/sync/segment.rs | 2 +- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index a3b754a2..3fc8b607 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -78,6 +78,18 @@ jobs: command: test args: --features sync + - name: Run tests (debug, sync feature, thread-pool test for sync::Cache) + uses: actions-rs/cargo@v1 + with: + command: test + args: --lib --features sync sync::cache::tests::enabling_and_disabling_thread_pools -- --exact --ignored + + - name: Run tests (debug, sync feature, thread-pool test for sync::SegmentCache) + uses: actions-rs/cargo@v1 + with: + command: test + args: --lib --features sync sync::segment::tests::enabling_and_disabling_thread_pools -- --exact --ignored + - name: Run tests (release, sync feature) uses: actions-rs/cargo@v1 with: diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 0d96b7e3..b9aee80b 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -3064,7 +3064,7 @@ mod tests { // Ignored by default. This test cannot run in parallel with other tests. #[test] #[ignore] - fn enabled_thread_pools() { + fn enabling_and_disabling_thread_pools() { use crate::common::concurrent::thread_pool::{PoolName::*, ThreadPoolRegistry}; // Enable the housekeeper pool. diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 9208e7af..ba6e8d21 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -1671,7 +1671,7 @@ mod tests { // Ignored by default. This test cannot run in parallel with other tests. #[test] #[ignore] - fn enabled_thread_pools() { + fn enabling_and_disabling_thread_pools() { use crate::common::concurrent::thread_pool::{PoolName::*, ThreadPoolRegistry}; const NUM_SEGMENTS: usize = 4; From 0edd79f52be0a82ef347a57cfabcb2fcd0051b8b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Thu, 4 Aug 2022 05:56:47 +0800 Subject: [PATCH 13/15] Update the min version of a dev-dependency anyhow from 1.0.0 to 1.0.19 - Earlier versions will not compile with recent nightly compiler: rustc 1.64.0-nightly (4493a0f47 2022-08-02) - Note that the latest version of anyhow is 1.0.59. --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a0561c07..637ca682 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,7 @@ log = { version = "0.4", optional = true } [dev-dependencies] actix-rt = { version = "2.7", default-features = false } -anyhow = "1.0" +anyhow = "1.0.19" async-std = { version = "1.11", features = ["attributes"] } env_logger = "0.9" getrandom = "0.2" From 4041e1fd8914eb16ae5aa8d25dab0207124fd05b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Thu, 4 Aug 2022 06:41:44 +0800 Subject: [PATCH 14/15] Optionally disable thread pools in `sync::Cache` Write the doc comment for `sync::CacheBuilder::thread_pool_enabled`. --- src/sync/builder.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/sync/builder.rs b/src/sync/builder.rs index 638e6659..b2745017 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -389,11 +389,15 @@ impl CacheBuilder { } } - /// Specify whether or not to enable thread pool for housekeeping tasks, such as - /// ... `true` to enable and `false` to disable. - /// - /// The thread pool is enabled by default in current version of Moka but the - /// default will be changed to disabled in future version. + /// Specify whether or not to enable the thread pool for housekeeping tasks. + /// These tasks include removing expired entries and updating the LRU queue and + /// LFU filter. `true` to enable and `false` to disable. (Default: `true`) + /// + /// If disabled, the housekeeping tasks will be executed by a client thread when + /// necessary. + /// + /// NOTE: The default value will be changed to `false` in a future release + /// (v0.10.0 or v0.11.0). pub fn thread_pool_enabled(self, v: bool) -> Self { Self { thread_pool_enabled: v, From 9f61c6bca2ea427720d14b55ab0e95fd1f5c71f2 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Thu, 4 Aug 2022 06:44:11 +0800 Subject: [PATCH 15/15] cargo fmt --- src/sync/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync/builder.rs b/src/sync/builder.rs index b2745017..e7d3075d 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -392,7 +392,7 @@ impl CacheBuilder { /// Specify whether or not to enable the thread pool for housekeeping tasks. /// These tasks include removing expired entries and updating the LRU queue and /// LFU filter. `true` to enable and `false` to disable. (Default: `true`) - /// + /// /// If disabled, the housekeeping tasks will be executed by a client thread when /// necessary. ///