Skip to content

Commit

Permalink
Fix deque steal race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jul 30, 2021
1 parent 48b13dc commit 180b462
Showing 1 changed file with 59 additions and 39 deletions.
98 changes: 59 additions & 39 deletions crossbeam-deque/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,13 @@ impl<T> Stealer<T> {
let task = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
Expand Down Expand Up @@ -816,16 +818,18 @@ impl<T> Stealer<T> {
}

// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
return Steal::Retry;
}
Expand Down Expand Up @@ -856,11 +860,18 @@ impl<T> Stealer<T> {
let task = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(task);
Expand Down Expand Up @@ -1002,17 +1013,19 @@ impl<T> Stealer<T> {
}
}

// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
// Try incrementing the front index to steal the task.
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
Expand Down Expand Up @@ -1058,11 +1071,18 @@ impl<T> Stealer<T> {
let tmp = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(tmp);
Expand Down Expand Up @@ -1436,9 +1456,9 @@ impl<T> Injector<T> {

// Destroy the block if we've reached the end, or if another thread wanted to destroy
// but couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP {
Block::destroy(block, offset);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
if (offset + 1 == BLOCK_CAP)
|| (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0)
{
Block::destroy(block, offset);
}

Expand Down

0 comments on commit 180b462

Please sign in to comment.