Skip to content

Commit

Permalink
Add the policy_snapshot method to the sync cache to get a snapshot of
Browse files Browse the repository at this point in the history
coldest and hottest entries
  • Loading branch information
tatsuya6502 committed Jan 20, 2025
1 parent 2d932ef commit 5c0caaf
Show file tree
Hide file tree
Showing 9 changed files with 395 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
pub(crate) mod builder_utils;
pub(crate) mod concurrent;
pub(crate) mod deque;
pub(crate) mod entry;
pub mod entry;
pub(crate) mod error;
pub(crate) mod frequency_sketch;
pub(crate) mod time;
Expand Down
8 changes: 8 additions & 0 deletions src/common/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,19 @@ impl<K> KeyHashDate<K> {
self.entry_info.last_accessed()
}

pub(crate) fn expiration_time(&self) -> Option<Instant> {
self.entry_info.expiration_time()
}

pub(crate) fn is_dirty(&self) -> bool {
self.entry_info.is_dirty()
}
}

pub(crate) type KeyHashDateNode<K> = DeqNode<KeyHashDate<K>>;

pub(crate) type KeyHashDateNodePtr<K> = NonNull<KeyHashDateNode<K>>;

pub(crate) struct KvEntry<K, V> {
pub(crate) key: Arc<K>,
pub(crate) entry: MiniArc<ValueEntry<K, V>>,
Expand Down
38 changes: 27 additions & 11 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,30 @@ use super::constants::LOG_SYNC_INTERVAL_MILLIS;
use super::constants::{READ_LOG_FLUSH_POINT, WRITE_LOG_FLUSH_POINT};
use crate::common::time::{AtomicInstant, Instant};
use crate::common::HousekeeperConfig;
use crate::entry::{EntrySnapshot, EntrySnapshotConfig};

use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData;
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};

pub(crate) trait InnerSync {
pub(crate) trait InnerSync<K> {
/// Runs the pending tasks. Returns `true` if there are more entries to evict in
/// next run.
fn run_pending_tasks(
&self,
timeout: Option<Duration>,
max_log_sync_repeats: u32,
eviction_batch_size: u32,
) -> bool;
snapshot_request: Option<EntrySnapshotConfig>,
) -> (bool, Option<EntrySnapshot<K>>);

fn now(&self) -> Instant;
}

pub(crate) struct Housekeeper {
pub(crate) struct Housekeeper<K> {
run_lock: Mutex<()>,
run_after: AtomicInstant,
/// A flag to indicate if the last call on `run_pending_tasks` method left some
Expand All @@ -46,9 +49,10 @@ pub(crate) struct Housekeeper {
/// Default: `EVICTION_BATCH_SIZE`.
eviction_batch_size: u32,
auto_run_enabled: AtomicBool,
key_ty: PhantomData<K>,
}

impl Housekeeper {
impl<K> Housekeeper<K> {
pub(crate) fn new(
is_eviction_listener_enabled: bool,
config: HousekeeperConfig,
Expand All @@ -71,6 +75,7 @@ impl Housekeeper {
max_log_sync_repeats: config.max_log_sync_repeats,
eviction_batch_size: config.eviction_batch_size,
auto_run_enabled: AtomicBool::new(true),
key_ty: PhantomData,
}
}

Expand Down Expand Up @@ -102,28 +107,39 @@ impl Housekeeper {
&& (ch_len >= ch_flush_point || now >= self.run_after.instant().unwrap())
}

pub(crate) fn run_pending_tasks<T: InnerSync>(&self, cache: &T) {
pub(crate) fn run_pending_tasks<T: InnerSync<K>>(
&self,
cache: &T,
snapshot_config: Option<EntrySnapshotConfig>,
) -> Option<EntrySnapshot<K>> {
let lock = self.run_lock.lock();
self.do_run_pending_tasks(cache, lock);
self.do_run_pending_tasks(cache, lock, snapshot_config)
}

pub(crate) fn try_run_pending_tasks<T: InnerSync>(&self, cache: &T) -> bool {
pub(crate) fn try_run_pending_tasks<T: InnerSync<K>>(&self, cache: &T) -> bool {
if let Some(lock) = self.run_lock.try_lock() {
self.do_run_pending_tasks(cache, lock);
self.do_run_pending_tasks(cache, lock, None);
true
} else {
false
}
}

fn do_run_pending_tasks<T: InnerSync>(&self, cache: &T, _lock: MutexGuard<'_, ()>) {
fn do_run_pending_tasks<T: InnerSync<K>>(
&self,
cache: &T,
_lock: MutexGuard<'_, ()>,
snapshot_config: Option<EntrySnapshotConfig>,
) -> Option<EntrySnapshot<K>> {
let now = cache.now();
self.run_after.set_instant(Self::sync_after(now));
let timeout = self.maintenance_task_timeout;
let repeats = self.max_log_sync_repeats;
let batch_size = self.eviction_batch_size;
let more_to_evict = cache.run_pending_tasks(timeout, repeats, batch_size);
let (more_to_evict, snap) =
cache.run_pending_tasks(timeout, repeats, batch_size, snapshot_config);
self.set_more_entries_to_evict(more_to_evict);
snap
}

fn sync_after(now: Instant) -> Instant {
Expand All @@ -133,7 +149,7 @@ impl Housekeeper {
}

#[cfg(test)]
impl Housekeeper {
impl<K> Housekeeper<K> {
pub(crate) fn disable_auto_run(&self) {
self.auto_run_enabled.store(false, Ordering::Relaxed);
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl<T> DeqNode<T> {
}
}

pub(crate) fn prev_node_ptr(this: NonNull<Self>) -> Option<NonNull<DeqNode<T>>> {
unsafe { this.as_ref() }.prev
}

pub(crate) fn next_node_ptr(this: NonNull<Self>) -> Option<NonNull<DeqNode<T>>> {
unsafe { this.as_ref() }.next
}
Expand Down Expand Up @@ -162,6 +166,10 @@ impl<T> Deque<T> {
self.tail.as_ref().map(|node| unsafe { node.as_ref() })
}

pub(crate) fn peek_back_ptr(&self) -> Option<NonNull<DeqNode<T>>> {
self.tail.as_ref().copied()
}

/// Adds the given node to the back of the list.
pub(crate) fn push_back(&mut self, mut node: Box<DeqNode<T>>) -> NonNull<DeqNode<T>> {
// This method takes care not to create mutable references to whole nodes,
Expand Down
191 changes: 190 additions & 1 deletion src/common/entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
hash::{BuildHasher, Hash},
sync::Arc,
time::{Duration, Instant},
};

use crate::sync::Cache;

use super::concurrent::KeyHashDate;

/// A snapshot of a single entry in the cache.
///
Expand Down Expand Up @@ -90,3 +99,183 @@ impl<K, V> Entry<K, V> {
self.is_old_value_replaced
}
}

#[derive(Debug, Clone)]
pub struct EntryMetadata {
region: EntryRegion,
policy_weight: u32,
estimated_frequency: u8,
last_modified: Instant,
last_accessed: Instant,
expiration_time: Option<Instant>,
}

impl EntryMetadata {
pub fn new(
region: EntryRegion,
policy_weight: u32,
estimated_frequency: u8,
last_modified: Instant,
last_accessed: Instant,
expiration_time: Option<Instant>,
) -> Self {
Self {
region,
policy_weight,
estimated_frequency,
last_modified,
last_accessed,
expiration_time,
}
}

pub(crate) fn from_element<K>(
region: EntryRegion,
estimated_frequency: u8,
element: &KeyHashDate<K>,
clock: &super::time::Clock,
time_to_live: Option<Duration>,
time_to_idle: Option<Duration>,
) -> Self {
// SAFETY: `last_accessed` and `last_modified` should be `Some` since we
// assume the element is not dirty. But we use `unwrap_or_default` to avoid
// panicking just in case they are `None`.
let last_modified = clock.to_std_instant(element.last_modified().unwrap_or_default());
let last_accessed = clock.to_std_instant(element.last_accessed().unwrap_or_default());

// When per-entry expiration is used, the expiration time is set in the
// element, otherwise, we calculate the expiration time based on the
// `time_to_live` and `time_to_idle` settings.
let expiration_time = if element.expiration_time().is_some() {
element.expiration_time().map(|ts| clock.to_std_instant(ts))
} else {
match (time_to_live, time_to_idle) {
(Some(ttl), Some(tti)) => {
let exp_by_ttl = last_modified + ttl;
let exp_by_tti = last_accessed + tti;
Some(exp_by_ttl.min(exp_by_tti))
}
(Some(ttl), None) => Some(last_modified + ttl),
(None, Some(tti)) => Some(last_accessed + tti),
(None, None) => None,
}
};

Self {
region,
policy_weight: element.entry_info().policy_weight(),
estimated_frequency,
last_modified,
last_accessed,
expiration_time,
}
}

pub fn region(&self) -> EntryRegion {
self.region
}

pub fn policy_weight(&self) -> u32 {
self.policy_weight
}

pub fn estimated_frequency(&self) -> u8 {
self.estimated_frequency
}

pub fn last_modified(&self) -> Instant {
self.last_modified
}

pub fn last_accessed(&self) -> Instant {
self.last_accessed
}

pub fn expiration_time(&self) -> Option<Instant> {
self.expiration_time
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EntryRegion {
Window,
Main,
}

#[derive(Debug, Clone)]
pub struct EntrySnapshot<K> {
snapshot_at: Instant,
coldest: Vec<(Arc<K>, EntryMetadata)>,
hottest: Vec<(Arc<K>, EntryMetadata)>,
}

impl<K> EntrySnapshot<K> {
pub(crate) fn new(
snapshot_at: Instant,
coldest: Vec<(Arc<K>, EntryMetadata)>,
hottest: Vec<(Arc<K>, EntryMetadata)>,
) -> Self {
Self {
snapshot_at,
coldest,
hottest,
}
}

pub fn snapshot_at(&self) -> Instant {
self.snapshot_at
}

pub fn coldest(&self) -> &[(Arc<K>, EntryMetadata)] {
&self.coldest
}

pub fn hottest(&self) -> &[(Arc<K>, EntryMetadata)] {
&self.hottest
}
}

pub struct EntrySnapshotRequest<K, V, S> {
cache: Cache<K, V, S>,
config: EntrySnapshotConfig,
}

impl<K, V, S> EntrySnapshotRequest<K, V, S> {
pub(crate) fn new_with_cache(cache: Cache<K, V, S>) -> Self {
Self {
cache,
config: EntrySnapshotConfig::default(),
}
}

pub fn with_coldest(self, count: usize) -> Self {
let mut req = self;
req.config.coldest = count;
req
}

pub fn with_hottest(self, count: usize) -> Self {
let mut req = self;
req.config.hottest = count;
req
}

// pub fn config(&self) -> &EntrySnapshotConfig {
// &self.config
// }

pub fn capture(self) -> EntrySnapshot<K>
where
K: Hash + Send + Sync + Eq + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
self.cache.capture_entry_snapshot(self.config)
}
}

#[derive(Debug, Clone, Default)]
pub(crate) struct EntrySnapshotConfig {
pub(crate) coldest: usize,
pub(crate) hottest: usize,
}
2 changes: 1 addition & 1 deletion src/common/time/instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub(crate) const MAX_NANOS: u64 = u64::MAX - 1;

/// `Instant` represents a point in time since the `Clock` was created. It has
/// nanosecond precision.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct Instant {
elapsed_ns: u64,
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub use common::error::PredicateError;

#[cfg(any(feature = "sync", feature = "future"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "future"))))]
pub use common::entry::Entry;
pub use common::entry::{self, Entry};

#[cfg(any(feature = "sync", feature = "future"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "sync", feature = "future"))))]
Expand Down
Loading

0 comments on commit 5c0caaf

Please sign in to comment.