Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid to calculate the same hash twice in get, insert, invalidate, etc. methods #90

Merged
merged 2 commits into from
Feb 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 120 additions & 97 deletions src/cht/segment.rs

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
if let Some(kv) = self.base.remove_entry(key) {
let hash = self.base.hash(key);
if let Some(kv) = self.base.remove_entry(key, hash) {
let op = WriteOp::Remove(kv);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(&self.base.write_op_ch, op, hk)
Expand All @@ -620,7 +621,8 @@ where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
if let Some(kv) = self.base.remove_entry(key) {
let hash = self.base.hash(key);
if let Some(kv) = self.base.remove_entry(key, hash) {
let op = WriteOp::Remove(kv);
let hk = self.base.housekeeper.as_ref();
Self::blocking_schedule_write_op(&self.base.write_op_ch, op, hk)
Expand Down
15 changes: 11 additions & 4 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ where

#[inline]
pub(crate) fn remove_waiter(&self, key: &Arc<K>, type_id: TypeId) {
let key = Arc::clone(key);
self.waiters.remove(&(key, type_id));
let (cht_key, hash) = self.cht_key_hash(key, type_id);
self.waiters.remove(&cht_key, hash);
}

#[inline]
Expand All @@ -246,9 +246,16 @@ where
type_id: TypeId,
waiter: &Waiter<V>,
) -> Option<Waiter<V>> {
let key = Arc::clone(key);
let (cht_key, hash) = self.cht_key_hash(key, type_id);
let waiter = TrioArc::clone(waiter);
self.waiters.insert_if_not_present((key, type_id), waiter)
self.waiters.insert_if_not_present(cht_key, hash, waiter)
}

#[inline]
fn cht_key_hash(&self, key: &Arc<K>, type_id: TypeId) -> ((Arc<K>, TypeId), u64) {
let cht_key = (Arc::clone(key), type_id);
let hash = self.waiters.hash(&cht_key);
(cht_key, hash)
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl<K> KeyHashDate<K> {
&self.key
}

pub(crate) fn hash(&self) -> u64 {
self.hash
}

pub(crate) fn entry_info(&self) -> &EntryInfo {
&self.entry_info
}
Expand Down
76 changes: 42 additions & 34 deletions src/sync/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ where
self.record_read_op(op).expect("Failed to record a get op");
};

match self.inner.get_key_value(key) {
match self.inner.get_key_value(key, hash) {
None => {
record(ReadOp::Miss(hash));
None
Expand Down Expand Up @@ -173,12 +173,12 @@ where
}

#[inline]
pub(crate) fn remove_entry<Q>(&self, key: &Q) -> Option<KvEntry<K, V>>
pub(crate) fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.inner.remove_entry(key)
self.inner.remove_entry(key, hash)
}

#[inline]
Expand Down Expand Up @@ -274,6 +274,7 @@ where
// largest serial number is the one made by the last call of the closures.
self.inner.cache.insert_with_or_modify(
Arc::clone(&key),
hash,
// on_insert
|| {
let entry = self.new_value_entry(value.clone(), weight);
Expand Down Expand Up @@ -563,22 +564,22 @@ where
}

#[inline]
fn get_key_value<Q>(&self, key: &Q) -> Option<CacheEntry<K, V>>
fn get_key_value<Q>(&self, key: &Q, hash: u64) -> Option<CacheEntry<K, V>>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.cache.get_key_value(key)
self.cache.get_key_value(key, hash)
}

#[inline]
fn remove_entry<Q>(&self, key: &Q) -> Option<KvEntry<K, V>>
fn remove_entry<Q>(&self, key: &Q, hash: u64) -> Option<KvEntry<K, V>>
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.cache
.remove_entry(key)
.remove_entry(key, hash)
.map(|(key, entry)| KvEntry::new(key, entry))
}

Expand Down Expand Up @@ -695,19 +696,20 @@ where
K: Hash + Eq,
S: BuildHasher,
{
fn get_value_entry(&self, key: &Arc<K>) -> Option<TrioArc<ValueEntry<K, V>>> {
self.cache.get(key)
fn get_value_entry(&self, key: &Arc<K>, hash: u64) -> Option<TrioArc<ValueEntry<K, V>>> {
self.cache.get(key, hash)
}

fn remove_key_value_if<F>(
&self,
key: &Arc<K>,
hash: u64,
condition: F,
) -> Option<TrioArc<ValueEntry<K, V>>>
where
F: FnMut(&Arc<K>, &TrioArc<ValueEntry<K, V>>) -> bool,
{
self.cache.remove_if(key, condition)
self.cache.remove_if(key, hash, condition)
}
}

Expand Down Expand Up @@ -942,7 +944,7 @@ where
if let Some(max) = self.max_capacity {
if new_weight as u64 > max {
// The candidate is too big to fit in the cache. Reject it.
self.cache.remove(&Arc::clone(&kh.key));
self.cache.remove(&Arc::clone(&kh.key), kh.hash);
return;
}
}
Expand All @@ -959,9 +961,9 @@ where
} => {
// Try to remove the victims from the cache (hash map).
for victim in victim_nodes {
if let Some((_vic_key, vic_entry)) = self
.cache
.remove_entry(unsafe { &victim.as_ref().element.key })
let element = unsafe { &victim.as_ref().element };
if let Some((_vic_key, vic_entry)) =
self.cache.remove_entry(element.key(), element.hash())
{
// And then remove the victim from the deques.
Self::handle_remove(deqs, vic_entry, counters);
Expand All @@ -980,7 +982,7 @@ where
AdmissionResult::Rejected { skipped_nodes: s } => {
skipped_nodes = s;
// Remove the candidate from the cache (hash map).
self.cache.remove(&Arc::clone(&kh.key));
self.cache.remove(&Arc::clone(&kh.key), kh.hash);
}
};

Expand Down Expand Up @@ -1032,10 +1034,11 @@ where
}
if let Some(victim) = next_victim.take() {
next_victim = victim.next_node();
let vic_elem = &victim.element;

if let Some(vic_entry) = cache.get(&victim.element.key) {
if let Some(vic_entry) = cache.get(vic_elem.key(), vic_elem.hash()) {
victims.add_policy_weight(vic_entry.policy_weight());
victims.add_frequency(freq, victim.element.hash);
victims.add_frequency(freq, vic_elem.hash());
victim_nodes.push(NonNull::from(victim));
retries = 0;
} else {
Expand Down Expand Up @@ -1167,31 +1170,31 @@ where
let va = &self.valid_after();
for _ in 0..batch_size {
// Peek the front node of the deque and check if it is expired.
let key = deq.peek_front().and_then(|node| {
let key_hash = deq.peek_front().and_then(|node| {
if is_expired_entry_ao(tti, va, &*node, now) {
Some(Arc::clone(node.element.key()))
Some((Arc::clone(node.element.key()), node.element.hash()))
} else {
None
}
});

if key.is_none() {
if key_hash.is_none() {
break;
}

let key = key.as_ref().unwrap();
let (key, hash) = key_hash.as_ref().map(|(k, h)| (k, *h)).unwrap();

// Remove the key from the map only when the entry is really
// expired. This check is needed because it is possible that the entry in
// the map has been updated or deleted but its deque node we checked
// above have not been updated yet.
let maybe_entry = self
.cache
.remove_if(key, |_, v| is_expired_entry_ao(tti, va, v, now));
.remove_if(key, hash, |_, v| is_expired_entry_ao(tti, va, v, now));

if let Some(entry) = maybe_entry {
Self::handle_remove_with_deques(deq_name, deq, write_order_deq, entry, counters);
} else if !self.try_skip_updated_entry(key, deq_name, deq, write_order_deq) {
} else if !self.try_skip_updated_entry(key, hash, deq_name, deq, write_order_deq) {
break;
}
}
Expand All @@ -1201,11 +1204,12 @@ where
fn try_skip_updated_entry(
&self,
key: &K,
hash: u64,
deq_name: &str,
deq: &mut Deque<KeyHashDate<K>>,
write_order_deq: &mut Deque<KeyDate<K>>,
) -> bool {
if let Some(entry) = self.cache.get(key) {
if let Some(entry) = self.cache.get(key, hash) {
if entry.last_accessed().is_none() {
// The key exists and the entry has been updated.
Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
Expand Down Expand Up @@ -1252,14 +1256,15 @@ where
}

let key = key.as_ref().unwrap();
let hash = self.hash(key);

let maybe_entry = self
.cache
.remove_if(key, |_, v| is_expired_entry_wo(ttl, va, v, now));
.remove_if(key, hash, |_, v| is_expired_entry_wo(ttl, va, v, now));

if let Some(entry) = maybe_entry {
Self::handle_remove(deqs, entry, counters);
} else if let Some(entry) = self.cache.get(key) {
} else if let Some(entry) = self.cache.get(key, hash) {
if entry.last_modified().is_none() {
deqs.move_to_back_ao(&entry);
deqs.move_to_back_wo(&entry);
Expand Down Expand Up @@ -1333,7 +1338,9 @@ where
while len < batch_size {
if let Some(kd) = iter.next() {
if let Some(ts) = kd.last_modified() {
candidates.push(KeyDateLite::new(&kd.key, ts));
let key = &kd.key;
let hash = self.hash(key);
candidates.push(KeyDateLite::new(key, hash, ts));
len += 1;
}
} else {
Expand Down Expand Up @@ -1363,17 +1370,18 @@ where
break;
}

let maybe_key_and_ts = deq.peek_front().map(|node| {
let maybe_key_hash_ts = deq.peek_front().map(|node| {
(
Arc::clone(node.element.key()),
node.element.hash(),
node.element.entry_info().last_modified(),
)
});

let (key, ts) = match maybe_key_and_ts {
Some((key, Some(ts))) => (key, ts),
Some((key, None)) => {
if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) {
let (key, hash, ts) = match maybe_key_hash_ts {
Some((key, hash, Some(ts))) => (key, hash, ts),
Some((key, hash, None)) => {
if self.try_skip_updated_entry(&key, hash, DEQ_NAME, deq, write_order_deq) {
continue;
} else {
break;
Expand All @@ -1382,7 +1390,7 @@ where
None => break,
};

let maybe_entry = self.cache.remove_if(&key, |_, v| {
let maybe_entry = self.cache.remove_if(&key, hash, |_, v| {
if let Some(lm) = v.last_modified() {
lm == ts
} else {
Expand All @@ -1394,7 +1402,7 @@ where
let weight = entry.policy_weight();
Self::handle_remove_with_deques(DEQ_NAME, deq, write_order_deq, entry, counters);
evicted = evicted.saturating_add(weight as u64);
} else if !self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) {
} else if !self.try_skip_updated_entry(&key, hash, DEQ_NAME, deq, write_order_deq) {
break;
}
}
Expand Down
11 changes: 10 additions & 1 deletion src/sync/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,16 @@ where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
if let Some(kv) = self.base.remove_entry(key) {
let hash = self.base.hash(key);
self.invalidate_with_hash(key, hash);
}

pub(crate) fn invalidate_with_hash<Q>(&self, key: &Q, hash: u64)
where
Arc<K>: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
if let Some(kv) = self.base.remove_entry(key, hash) {
let op = WriteOp::Remove(kv);
let hk = self.base.housekeeper.as_ref();
Self::schedule_write_op(&self.base.write_op_ch, op, hk).expect("Failed to remove");
Expand Down
Loading