Skip to content

Commit

Permalink
Merge pull request #82 from moka-rs/unstable-debug-counters
Browse files Browse the repository at this point in the history
Add unstable-debug-counters feature
  • Loading branch information
tatsuya6502 authored Feb 2, 2022
2 parents 6b6dffe + 3793f97 commit 28a6d70
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"rust-analyzer.cargo.features": ["future"],
"rust-analyzer.cargo.features": ["future", "unstable-debug-counters"],
"rust-analyzer.server.extraEnv": {
"CARGO_TARGET_DIR": "target/ra"
},
Expand Down
14 changes: 10 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ readme = "README.md"
exclude = [".circleci", ".devcontainer", ".github", ".gitpod.yml", ".vscode"]
build = "build.rs"

# https://docs.rs/about/metadata
[package.metadata.docs.rs]
features = ["future"]

[features]
default = ["atomic64"]

Expand All @@ -31,6 +27,11 @@ future = ["async-io", "async-lock", "futures-util"]
# https://github.com/moka-rs/moka#resolving-compile-errors-on-some-32-bit-platforms
atomic64 = []

# This unstable feature adds `GlobalDebugCounters::current` function, which returns
# counters of internal object construction and destruction. It will have some
# performance impacts and is intended for debugging perpose.
unstable-debug-counters = ["future"]

[dependencies]
crossbeam-channel = "0.5.2"
crossbeam-epoch = "0.8.2"
Expand Down Expand Up @@ -65,6 +66,11 @@ trybuild = "1.0"
[target.'cfg(skeptic)'.build-dependencies]
skeptic = "0.13"

# https://docs.rs/about/metadata
[package.metadata.docs.rs]
# Build the doc with "future" feature enabled.
features = ["future"]

# ----------------------------------
# RUSTSEC, etc.
#
Expand Down
28 changes: 28 additions & 0 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ impl<K, V> BucketArray<K, V> {

let buckets = buckets.into_boxed_slice();

#[cfg(feature = "unstable-debug-counters")]
{
use crate::sync::debug_counters::InternalGlobalDebugCounters as Counters;

let size = (buckets.len() * std::mem::size_of::<Atomic<Bucket<K, V>>>()) as u64;
Counters::bucket_array_created(size);
}

Self {
buckets,
next: Atomic::null(),
Expand All @@ -50,6 +58,16 @@ impl<K, V> BucketArray<K, V> {
}
}

#[cfg(feature = "unstable-debug-counters")]
impl<K, V> Drop for BucketArray<K, V> {
fn drop(&mut self) {
use crate::sync::debug_counters::InternalGlobalDebugCounters as Counters;

let size = (self.buckets.len() * std::mem::size_of::<Atomic<Bucket<K, V>>>()) as u64;
Counters::bucket_array_dropped(size);
}
}

impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
pub(crate) fn get<Q: ?Sized + Eq>(
&self,
Expand Down Expand Up @@ -394,13 +412,23 @@ pub(crate) struct Bucket<K, V> {

impl<K, V> Bucket<K, V> {
pub(crate) fn new(key: K, value: V) -> Bucket<K, V> {
#[cfg(feature = "unstable-debug-counters")]
crate::sync::debug_counters::InternalGlobalDebugCounters::bucket_created();

Self {
key,
maybe_value: MaybeUninit::new(value),
}
}
}

#[cfg(feature = "unstable-debug-counters")]
impl<K, V> Drop for Bucket<K, V> {
fn drop(&mut self) {
crate::sync::debug_counters::InternalGlobalDebugCounters::bucket_dropped();
}
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct RelocatedError;

Expand Down
41 changes: 21 additions & 20 deletions src/cht/segment/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,27 @@ impl<K, V, S> HashMap<K, V, S> {
// self.len() == 0
// }

// /// Returns the number of elements the map can hold without reallocating any
// /// bucket pointer arrays.
// ///
// /// Note that all mutating operations except removal will result in a bucket
// /// being allocated or reallocated.
// ///
// /// # Safety
// ///
// /// This method on its own is safe, but other threads can increase the
// /// capacity of each segment at any time by adding elements.
// pub(crate) fn capacity(&self) -> usize {
// let guard = &crossbeam_epoch::pin();

// self.segments
// .iter()
// .map(|s| s.bucket_array.load_consume(guard))
// .map(|p| unsafe { p.as_ref() })
// .map(|a| a.map(BucketArray::capacity).unwrap_or(0))
// .sum::<usize>()
// }
#[cfg(feature = "unstable-debug-counters")]
/// Returns the number of elements the map can hold without reallocating any
/// bucket pointer arrays.
///
/// Note that all mutating operations except removal will result in a bucket
/// being allocated or reallocated.
///
/// # Safety
///
/// This method on its own is safe, but other threads can increase the
/// capacity of each segment at any time by adding elements.
pub(crate) fn capacity(&self) -> usize {
let guard = &crossbeam_epoch::pin();

self.segments
.iter()
.map(|s| s.bucket_array.load_consume(guard))
.map(|p| unsafe { p.as_ref() })
.map(|a| a.map(BucketArray::capacity).unwrap_or(0))
.sum::<usize>()
}

// /// Returns the number of elements the `index`-th segment of the map can
// /// hold without reallocating a bucket pointer array.
Expand Down
10 changes: 10 additions & 0 deletions src/common/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ impl<T> std::fmt::Debug for DeqNode<T> {

impl<T> DeqNode<T> {
pub(crate) fn new(region: CacheRegion, element: T) -> Self {
#[cfg(feature = "unstable-debug-counters")]
crate::sync::debug_counters::InternalGlobalDebugCounters::deq_node_created();

Self {
region,
next: None,
Expand All @@ -55,6 +58,13 @@ impl<T> DeqNode<T> {
}
}

#[cfg(feature = "unstable-debug-counters")]
impl<T> Drop for DeqNode<T> {
fn drop(&mut self) {
crate::sync::debug_counters::InternalGlobalDebugCounters::deq_node_dropped();
}
}

/// Cursor is used to remember the current iterating position.
enum DeqCursor<T> {
Node(NonNull<DeqNode<T>>),
Expand Down
5 changes: 5 additions & 0 deletions src/common/frequency_sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ impl FrequencySketch {
hash += hash >> 32;
(hash & self.table_mask) as usize
}

#[cfg(feature = "unstable-debug-counters")]
pub(crate) fn table_size(&self) -> u64 {
(self.table.len() * std::mem::size_of::<u64>()) as u64
}
}

// Methods only available for testing.
Expand Down
8 changes: 8 additions & 0 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ use crate::{
PredicateError,
};

#[cfg(feature = "unstable-debug-counters")]
use crate::sync::debug_counters::CacheDebugStats;

use crossbeam_channel::{Sender, TrySendError};
use std::{
any::TypeId,
Expand Down Expand Up @@ -695,6 +698,11 @@ where
1
}

#[cfg(feature = "unstable-debug-counters")]
pub fn debug_stats(&self) -> CacheDebugStats {
self.base.debug_stats()
}

#[cfg(test)]
fn estimated_entry_count(&self) -> u64 {
self.base.estimated_entry_count()
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub(crate) mod common;

pub use common::error::PredicateError;

#[cfg(feature = "unstable-debug-counters")]
pub use sync::debug_counters::GlobalDebugCounters;

#[cfg(test)]
mod tests {
// #[cfg(trybuild)]
Expand Down
16 changes: 16 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ mod invalidator;
mod segment;
mod value_initializer;

#[cfg(feature = "unstable-debug-counters")]
pub(crate) mod debug_counters;

pub use builder::CacheBuilder;
pub use cache::Cache;
pub use segment::SegmentedCache;
Expand Down Expand Up @@ -190,6 +193,9 @@ pub(crate) struct ValueEntry<K, V> {

impl<K, V> ValueEntry<K, V> {
fn new(value: V, entry_info: TrioArc<EntryInfo>) -> Self {
#[cfg(feature = "unstable-debug-counters")]
self::debug_counters::InternalGlobalDebugCounters::value_entry_created();

Self {
value,
info: entry_info,
Expand All @@ -201,6 +207,9 @@ impl<K, V> ValueEntry<K, V> {
}

fn new_from(value: V, entry_info: TrioArc<EntryInfo>, other: &Self) -> Self {
#[cfg(feature = "unstable-debug-counters")]
self::debug_counters::InternalGlobalDebugCounters::value_entry_created();

let nodes = {
let other_nodes = other.nodes.lock();
DeqNodes {
Expand Down Expand Up @@ -267,6 +276,13 @@ impl<K, V> ValueEntry<K, V> {
}
}

#[cfg(feature = "unstable-debug-counters")]
impl<K, V> Drop for ValueEntry<K, V> {
fn drop(&mut self) {
self::debug_counters::InternalGlobalDebugCounters::value_entry_dropped();
}
}

impl<K, V> AccessTime for TrioArc<ValueEntry<K, V>> {
#[inline]
fn last_accessed(&self) -> Option<Instant> {
Expand Down
43 changes: 37 additions & 6 deletions src/sync/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use super::{
AccessTime, KeyDate, KeyHash, KeyHashDate, KvEntry, PredicateId, ReadOp, ValueEntry, Weigher,
WriteOp,
};

#[cfg(feature = "unstable-debug-counters")]
use super::debug_counters::CacheDebugStats;

use crate::{
common::{
self,
Expand Down Expand Up @@ -215,6 +219,11 @@ where
self.inner.time_to_idle()
}

#[cfg(feature = "unstable-debug-counters")]
pub fn debug_stats(&self) -> CacheDebugStats {
self.inner.debug_stats()
}

#[cfg(test)]
pub(crate) fn estimated_entry_count(&self) -> u64 {
self.inner.estimated_entry_count()
Expand Down Expand Up @@ -586,6 +595,19 @@ where
self.time_to_idle
}

#[cfg(feature = "unstable-debug-counters")]
pub fn debug_stats(&self) -> CacheDebugStats {
let ec = self.entry_count.load();
let ws = self.weighted_size.load();

CacheDebugStats::new(
ec,
ws,
(self.cache.capacity() * 2) as u64,
self.frequency_sketch.read().table_size(),
)
}

#[cfg(test)]
#[inline]
fn estimated_entry_count(&self) -> u64 {
Expand Down Expand Up @@ -688,6 +710,18 @@ where
}
}

#[cfg(feature = "unstable-debug-counters")]
mod batch_size {
pub(crate) const EVICTION_BATCH_SIZE: usize = 10_000;
pub(crate) const INVALIDATION_BATCH_SIZE: usize = 10_000;
}

#[cfg(not(feature = "unstable-debug-counters"))]
mod batch_size {
pub(crate) const EVICTION_BATCH_SIZE: usize = 500;
pub(crate) const INVALIDATION_BATCH_SIZE: usize = 500;
}

// TODO: Divide this method into smaller methods so that unit tests can do more
// precise testing.
// - sync_reads
Expand All @@ -701,9 +735,6 @@ where
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn sync(&self, max_repeats: usize) -> Option<SyncPace> {
const EVICTION_BATCH_SIZE: usize = 500;
const INVALIDATION_BATCH_SIZE: usize = 500;

let mut deqs = self.deques.lock();
let mut calls = 0;
let mut should_sync = true;
Expand Down Expand Up @@ -733,7 +764,7 @@ where
}

if self.has_expiry() || self.has_valid_after() {
self.evict_expired(&mut deqs, EVICTION_BATCH_SIZE, &mut counters);
self.evict_expired(&mut deqs, batch_size::EVICTION_BATCH_SIZE, &mut counters);
}

if self.invalidator_enabled {
Expand All @@ -742,7 +773,7 @@ where
self.invalidate_entries(
invalidator,
&mut deqs,
INVALIDATION_BATCH_SIZE,
batch_size::INVALIDATION_BATCH_SIZE,
&mut counters,
);
}
Expand All @@ -754,7 +785,7 @@ where
if weights_to_evict > 0 {
self.evict_lru_entries(
&mut deqs,
EVICTION_BATCH_SIZE,
batch_size::EVICTION_BATCH_SIZE,
weights_to_evict,
&mut counters,
);
Expand Down
Loading

0 comments on commit 28a6d70

Please sign in to comment.