Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove probe_loop in favor of Iterator #265

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 104 additions & 109 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,31 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
hash: u64,
mut eq: impl FnMut(&K) -> bool,
) -> Result<Shared<'g, Bucket<K, V>>, RelocatedError> {
let loop_result = self.probe_loop(guard, hash, |_, _, this_bucket_ptr| {
for bucket in self.probe(guard, hash) {
let Ok((_, _, this_bucket_ptr)) = bucket else { return Err(RelocatedError); };

let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Not found.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
};

if !eq(&this_bucket_ref.key) {
// Different key. Try next bucket
return ProbeLoopAction::Continue;
continue;
}

if is_tombstone(this_bucket_ptr) {
// Not found. (It has been removed)
ProbeLoopAction::Return(Shared::null())
return Ok(Shared::null());
} else {
// Found.
ProbeLoopAction::Return(this_bucket_ptr)
return Ok(this_bucket_ptr);
}
});

match loop_result {
ProbeLoopResult::Returned(t) => Ok(t),
ProbeLoopResult::LoopEnded => Ok(Shared::null()),
ProbeLoopResult::FoundSentinelTag => Err(RelocatedError),
}

Ok(Shared::null())
}

pub(crate) fn remove_if<F>(
Expand All @@ -121,31 +119,34 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
where
F: FnMut(&K, &V) -> bool,
{
let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let mut probe = self.probe(guard, hash);
while let Some(bucket) = probe.next() {
let Ok((_, this_bucket, this_bucket_ptr)) = bucket else { return Err(condition); };

let this_bucket_ref = if let Some(r) = unsafe { this_bucket_ptr.as_ref() } {
r
} else {
// Nothing to remove.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
};

let this_key = &this_bucket_ref.key;

if !eq(this_key) {
// Different key. Try next bucket.
return ProbeLoopAction::Continue;
continue;
}

if is_tombstone(this_bucket_ptr) {
// Already removed.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
}

let this_value = unsafe { &*this_bucket_ref.maybe_value.as_ptr() };

if !condition(this_key, this_value) {
// Found but the condition is false. Do not remove.
return ProbeLoopAction::Return(Shared::null());
return Ok(Shared::null());
}

// Found and the condition is true. Remove it. (Make it a tombstone)
Expand All @@ -160,45 +161,35 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
guard,
) {
// Succeeded. Return the removed value. (can be null)
Ok(_) => ProbeLoopAction::Return(new_bucket_ptr),
Ok(_) => return Ok(new_bucket_ptr),
// Failed. Reload to retry.
Err(_) => ProbeLoopAction::Reload,
Err(_) => probe.reload(),
}
});

match loop_result {
ProbeLoopResult::Returned(t) => Ok(t),
ProbeLoopResult::LoopEnded => Ok(Shared::null()),
ProbeLoopResult::FoundSentinelTag => Err(condition),
}

Ok(Shared::null())
}

pub(crate) fn insert_if_not_present<F>(
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
mut state: InsertOrModifyState<K, V, F>,
) -> Result<InsertionResult<'g, K, V>, InsertOrModifyState<K, V, F>>
where
F: FnOnce() -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();

let mut probe = self.probe(guard, hash);
while let Some(Ok((_, this_bucket, this_bucket_ptr))) = probe.next() {
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
if &this_bucket_ref.key != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);
return ProbeLoopAction::Continue;
continue;
}

if !is_tombstone(this_bucket_ptr) {
// Found. Return it.
return ProbeLoopAction::Return(InsertionResult::AlreadyPresent(
this_bucket_ptr,
));
return Ok(InsertionResult::AlreadyPresent(this_bucket_ptr));
}
}

Expand All @@ -213,18 +204,18 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
Ordering::Relaxed,
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));
ProbeLoopAction::Reload
state = InsertOrModifyState::from_bucket_value(new, None);
probe.reload();
} else if unsafe { this_bucket_ptr.as_ref() }.is_some() {
// Inserted by replacing a tombstone.
ProbeLoopAction::Return(InsertionResult::ReplacedTombstone(this_bucket_ptr))
return Ok(InsertionResult::ReplacedTombstone(this_bucket_ptr));
} else {
// Inserted.
ProbeLoopAction::Return(InsertionResult::Inserted)
return Ok(InsertionResult::Inserted);
}
});
}

loop_result.returned().ok_or_else(|| maybe_state.unwrap())
Err(state)
}

// https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity
Expand All @@ -233,26 +224,24 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
&self,
guard: &'g Guard,
hash: u64,
state: InsertOrModifyState<K, V, F>,
mut state: InsertOrModifyState<K, V, F>,
mut modifier: G,
) -> Result<Shared<'g, Bucket<K, V>>, (InsertOrModifyState<K, V, F>, G)>
where
F: FnOnce() -> V,
G: FnMut(&K, &V) -> V,
{
let mut maybe_state = Some(state);

let loop_result = self.probe_loop(guard, hash, |_, this_bucket, this_bucket_ptr| {
let state = maybe_state.take().unwrap();
let mut probe = self.probe(guard, hash);
while let Some(bucket) = probe.next() {
let Ok((_, this_bucket, this_bucket_ptr)) = bucket else { return Err((state, modifier)); };

let (new_bucket, maybe_insert_value) =
if let Some(this_bucket_ref) = unsafe { this_bucket_ptr.as_ref() } {
let this_key = &this_bucket_ref.key;

if this_key != state.key() {
// Different key. Try next bucket.
maybe_state = Some(state);
return ProbeLoopAction::Continue;
continue;
}

if is_tombstone(this_bucket_ptr) {
Expand Down Expand Up @@ -280,20 +269,15 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
guard,
) {
// Failed. Reload to retry.
maybe_state = Some(InsertOrModifyState::from_bucket_value(
new,
maybe_insert_value,
));
ProbeLoopAction::Reload
state = InsertOrModifyState::from_bucket_value(new, maybe_insert_value);
probe.reload();
} else {
// Succeeded. Return the previous value. (can be null)
ProbeLoopAction::Return(this_bucket_ptr)
return Ok(this_bucket_ptr);
}
});
}

loop_result
.returned()
.ok_or_else(|| (maybe_state.unwrap(), modifier))
Err((state, modifier))
}

fn insert_for_grow(
Expand All @@ -308,19 +292,22 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

let key = &unsafe { bucket_ptr.deref() }.key;

let loop_result = self.probe_loop(guard, hash, |i, this_bucket, this_bucket_ptr| {
let mut probe = self.probe(guard, hash);
while let Some(bucket) = probe.next() {
let Ok((i, this_bucket, this_bucket_ptr)) = bucket else { return None; };

if let Some(Bucket { key: this_key, .. }) = unsafe { this_bucket_ptr.as_ref() } {
if this_bucket_ptr == bucket_ptr {
return ProbeLoopAction::Return(None);
return None;
} else if this_key != key {
return ProbeLoopAction::Continue;
continue;
} else if !is_borrowed(this_bucket_ptr) {
return ProbeLoopAction::Return(None);
return None;
}
}

if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
ProbeLoopAction::Return(None)
return None;
} else if this_bucket
.compare_exchange_weak(
this_bucket_ptr,
Expand All @@ -331,13 +318,13 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
)
.is_ok()
{
ProbeLoopAction::Return(Some(i))
return Some(i);
} else {
ProbeLoopAction::Reload
probe.reload();
}
});
}

loop_result.returned().flatten()
None
}

pub(crate) fn keys<F, T>(
Expand Down Expand Up @@ -368,34 +355,63 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
}
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
fn probe_loop<F, T>(&self, guard: &'g Guard, hash: u64, mut f: F) -> ProbeLoopResult<T>
where
F: FnMut(usize, &Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>) -> ProbeLoopAction<T>,
{
let offset = hash as usize & (self.buckets.len() - 1);
struct Probe<'b, 'g, K: 'g, V: 'g> {
buckets: &'b [Atomic<Bucket<K, V>>],
guard: &'g Guard,
this_bucket: (usize, &'b Atomic<Bucket<K, V>>),
offset: usize,

for i in
(0..self.buckets.len()).map(|i| (i.wrapping_add(offset)) & (self.buckets.len() - 1))
{
let this_bucket = &self.buckets[i];
i: usize,
reload: bool,
}

loop {
let this_bucket_ptr = this_bucket.load_consume(guard);
impl<'b, 'g, K: 'g, V: 'g> Probe<'b, 'g, K, V> {
fn reload(&mut self) {
self.reload = true;
}
}

if is_sentinel(this_bucket_ptr) {
return ProbeLoopResult::FoundSentinelTag;
}
impl<'b, 'g, K: 'g, V: 'g> Iterator for Probe<'b, 'g, K, V> {
type Item = Result<(usize, &'b Atomic<Bucket<K, V>>, Shared<'g, Bucket<K, V>>), ()>;

match f(i, this_bucket, this_bucket_ptr) {
ProbeLoopAction::Continue => break,
ProbeLoopAction::Reload => (),
ProbeLoopAction::Return(t) => return ProbeLoopResult::Returned(t),
}
fn next(&mut self) -> Option<Self::Item> {
if !self.reload {
let max = self.buckets.len() - 1;
if self.i >= max {
return None;
}
self.i += 1;
let i = self.i.wrapping_add(self.offset) & max;
self.this_bucket = (i, &self.buckets[i]);
}
self.reload = false;

let this_bucket_ptr = self.this_bucket.1.load_consume(self.guard);

ProbeLoopResult::LoopEnded
if is_sentinel(this_bucket_ptr) {
return Some(Err(()));
}

let val = (self.this_bucket.0, self.this_bucket.1, this_bucket_ptr);
Some(Ok(val))
}
}

impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
fn probe(&self, guard: &'g Guard, hash: u64) -> Probe<'_, 'g, K, V> {
let buckets = &self.buckets;
let offset = hash as usize & (buckets.len() - 1);
// FIXME: this will panic if `len() == 0`
let this_bucket = (offset, &buckets[offset]);
Comment on lines +404 to +405
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change I’m a bit uncertain about, maybe using an Option for this_bucket could avoid that, and also remove some panic code as well. If you mention you want to optimize the probing itself either way, it might not be worth worth doing this immediately, if indeed len will never be 0.

Copy link
Member

@tatsuya6502 tatsuya6502 Jun 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the cht code and confirmed that len will never be 0. So this line will never cause a panic.

BucketArray can be created by default method or with_length function.

default uses 128 as len:

Self::with_length(0, BUCKET_ARRAY_DEFAULT_LENGTH)

with_length takes an arbitrary len in usize, but it's caller (below) will never call it with 0:

moka/src/cht/segment.rs

Lines 170 to 184 in e5690f8

if capacity == 0 {
unsafe {
ptr::write_bytes(segments.as_mut_ptr(), 0, actual_num_segments);
segments.set_len(actual_num_segments);
}
} else {
let actual_capacity = (capacity * 2 / actual_num_segments).next_power_of_two();
for _ in 0..actual_num_segments {
segments.push(Segment {
bucket_array: Atomic::new(BucketArray::with_length(0, actual_capacity)),
len: AtomicUsize::new(0),
});
}
}

capacity actual_num_segments actual_capacity (len)
0 any n/a (does not call with_length)
1 1 2
1 2 1
1 4 1
2 1 4

Probe {
buckets,
guard,
this_bucket,
offset,

i: 0,
reload: true,
}
}

pub(crate) fn rehash<H>(
Expand Down Expand Up @@ -645,27 +661,6 @@ where
hasher.finish()
}

enum ProbeLoopAction<T> {
Continue,
Reload,
Return(T),
}

enum ProbeLoopResult<T> {
LoopEnded,
FoundSentinelTag,
Returned(T),
}

impl<T> ProbeLoopResult<T> {
fn returned(self) -> Option<T> {
match self {
Self::Returned(t) => Some(t),
Self::LoopEnded | Self::FoundSentinelTag => None,
}
}
}

pub(crate) enum InsertionResult<'g, K, V> {
AlreadyPresent(Shared<'g, Bucket<K, V>>),
Inserted,
Expand Down