Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: new internal semaphore based on intrusive lists #2325

Merged
merged 97 commits into from
Mar 23, 2020
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
bfc4414
sync: adds Notify for basic task notification
carllerche Feb 3, 2020
1890645
fix warnings
carllerche Feb 3, 2020
5e82767
wip
hawkw Feb 17, 2020
ea8ba09
wip2
hawkw Feb 18, 2020
a888cb7
let's see just how broken it is
hawkw Feb 19, 2020
9ae6fba
works but is horrible
hawkw Feb 20, 2020
ea499fe
wip
hawkw Feb 21, 2020
3b4a14f
fix weird logic
hawkw Feb 21, 2020
8836c38
fix close behavior
hawkw Feb 27, 2020
4c81c51
Merge branch 'master' into eliza/semaphore-2
hawkw Feb 27, 2020
5ee03b1
update to use new linked list
hawkw Feb 27, 2020
c4c685b
bring back old semaphore_ll so channels can work
hawkw Feb 29, 2020
1337d8e
add loom tests
hawkw Mar 6, 2020
19e0157
decr assigned permits when assigning to self
hawkw Mar 6, 2020
2764ba2
closed means REALLY closed
hawkw Mar 7, 2020
d5ed440
broken
hawkw Mar 9, 2020
cb472fd
wip broken
hawkw Mar 9, 2020
66e419b
Merge branch 'master' into eliza/semaphore-2
hawkw Mar 9, 2020
bd1c5db
update after merge
hawkw Mar 9, 2020
1880121
almost evherything works
hawkw Mar 10, 2020
c7c08ff
wip
hawkw Mar 10, 2020
1a4f733
ALL LOOM TESTS PASS
hawkw Mar 11, 2020
a320d56
fix typoed fail ordering
hawkw Mar 11, 2020
df1e7bb
remove some warnings
hawkw Mar 11, 2020
b957af8
bring back try_acquire
hawkw Mar 12, 2020
ec2b4e7
misc cleanup
hawkw Mar 12, 2020
770d859
add test for release of leftover permits
hawkw Mar 12, 2020
38f654f
simplify `poll_acquire` behavior
hawkw Mar 12, 2020
9046b04
rm unneeded AtomicWaker
hawkw Mar 12, 2020
187e4d7
remove unused imports
hawkw Mar 12, 2020
c2450a2
add test for cancelling an acquire future
hawkw Mar 12, 2020
ddc1726
dropping an incomplete `Acquire` releases permits
hawkw Mar 12, 2020
1aa8088
no need to lock if unqueued
hawkw Mar 12, 2020
88b5e86
remove dbg/println
hawkw Mar 12, 2020
cf27256
remove more dbg/printlns
hawkw Mar 12, 2020
cb73e3f
add comments/remove dbgs
hawkw Mar 12, 2020
ce66762
add comments/cleanup
hawkw Mar 13, 2020
9f49f80
rm tests for stuff you can't do w new semaphore
hawkw Mar 13, 2020
fea9fd2
clean up & add comments
hawkw Mar 13, 2020
df6c556
rewrap comments, fix unused warnings
hawkw Mar 16, 2020
201db12
more wrapping
hawkw Mar 16, 2020
9f3f544
style/clean up
hawkw Mar 16, 2020
8628723
revert unneeded MPSC changes
hawkw Mar 16, 2020
49ed5dd
revert unneeded MPSC test changes
hawkw Mar 16, 2020
f5f5bc1
Apply suggestions from code review
hawkw Mar 16, 2020
24479c5
remove printlns from tests
hawkw Mar 16, 2020
6275513
checks to ensure API types remain Send/Sync/Unpin
hawkw Mar 16, 2020
f86147a
style: more consistent imports
hawkw Mar 16, 2020
9206558
make docs more accurate
hawkw Mar 16, 2020
0bcb271
Merge branch 'eliza/semaphore-2' of github.com:tokio-rs/tokio into el…
hawkw Mar 16, 2020
b412284
comments style improvements
hawkw Mar 16, 2020
3b4f416
make LinkedList::is_linked more misuse-resistant
hawkw Mar 16, 2020
c2c3e04
fix feature flag unhappiness
hawkw Mar 16, 2020
c9e122f
quick rwlock benches
hawkw Mar 17, 2020
034e141
quick semaphore benches
hawkw Mar 17, 2020
bffa18b
Merge branch 'eliza/bench-sync' into eliza/semaphore-2
hawkw Mar 17, 2020
0e0e13d
add `LinkedList::split_back` method
hawkw Mar 17, 2020
705afe1
add DoubleEndedIterator for linked_list::Iter~
hawkw Mar 17, 2020
c9e4800
(WIP) move notification outside of lock
hawkw Mar 17, 2020
389e231
clean up
hawkw Mar 17, 2020
a18d569
fixup
hawkw Mar 17, 2020
f758fb6
rm unused
hawkw Mar 18, 2020
8231eb7
Merge branch 'master' into eliza/semaphore-2
hawkw Mar 18, 2020
90f98c9
put back coop yielding in acquire futures
hawkw Mar 18, 2020
32a12bb
(hopefully) fix feature flags
hawkw Mar 18, 2020
5c76e67
move waiter state inside of lock
hawkw Mar 19, 2020
95bb327
remove dbg
hawkw Mar 19, 2020
c9699dc
cleanup
hawkw Mar 19, 2020
da1b027
remove unneeded SeqCst
hawkw Mar 20, 2020
819a59f
notify outside of lock on close, too
hawkw Mar 20, 2020
29fea22
carl-style feature flags
hawkw Mar 20, 2020
bdce157
ag
hawkw Mar 20, 2020
d9bf8b2
simplify permit struct significantly
hawkw Mar 20, 2020
7406d54
simplify lock/cas loop interaction
hawkw Mar 20, 2020
1f8afa6
simplify lock/cas loop interaction, episode II
hawkw Mar 20, 2020
2d8d413
remove permit struct entirely
hawkw Mar 20, 2020
cc2afac
unweaken cas
hawkw Mar 20, 2020
5573c65
fix backwards assertions in linked-list tests
hawkw Mar 20, 2020
e3c4b9f
rustfmt
hawkw Mar 21, 2020
23257c1
fixup loom test
hawkw Mar 21, 2020
4de0d58
LinkedList fmt::Debug impls shouldn't need T: Debug
hawkw Mar 21, 2020
f22a180
fix racy close behavior
hawkw Mar 21, 2020
9c9354b
run rustfmt on file cargo fmt doesn't see
hawkw Mar 21, 2020
2f1ead5
don't set unqueued when errored
hawkw Mar 22, 2020
077a61f
bring back atomic permit counter
hawkw Mar 22, 2020
be77a72
simplify a few things
hawkw Mar 22, 2020
d20a93a
expect instead of unwrap
hawkw Mar 22, 2020
be8df9e
ensure node pointers don't dangle on panics
hawkw Mar 22, 2020
af8273f
cleanup/docs
hawkw Mar 22, 2020
4bb9fb1
fix max permits, use low bits for flags
hawkw Mar 23, 2020
67fe420
const-ify permit shift amount
hawkw Mar 23, 2020
4d84ec7
check before reregistering waker/reduce unsafe scope
hawkw Mar 23, 2020
f1e3c4c
add overflow checks
hawkw Mar 23, 2020
a82c043
add comments explaining notification
hawkw Mar 23, 2020
88fa1aa
fix backwards assertion
hawkw Mar 23, 2020
0a01a59
Merge branch 'master' into eliza/semaphore-2
hawkw Mar 23, 2020
449f8b1
document future causalcell improvement
hawkw Mar 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
727 changes: 727 additions & 0 deletions tokio/src/sync/batch_semaphore.rs

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ cfg_sync! {

pub mod oneshot;

pub(crate) mod batch_semaphore;
pub(crate) mod semaphore_ll;
mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit};
Expand Down
23 changes: 8 additions & 15 deletions tokio/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@
//!
//! [`Mutex`]: struct.Mutex.html
//! [`MutexGuard`]: struct.MutexGuard.html

use crate::future::poll_fn;
use crate::sync::semaphore_ll as semaphore;
use crate::sync::batch_semaphore as semaphore;

use std::cell::UnsafeCell;
use std::error::Error;
Expand Down Expand Up @@ -152,18 +150,13 @@ impl<T> Mutex<T> {

/// A future that resolves on acquiring the lock and returns the `MutexGuard`.
pub async fn lock(&self) -> MutexGuard<'_, T> {
let mut guard = MutexGuard {
lock: self,
permit: semaphore::Permit::new(),
};
poll_fn(|cx| guard.permit.poll_acquire(cx, 1, &self.s))
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
guard
let mut permit = semaphore::Permit::new();
permit.acquire(1, &self.s).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
MutexGuard { lock: self, permit }
}

/// Tries to acquire the lock
Expand Down
37 changes: 13 additions & 24 deletions tokio/src/sync/rwlock.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::future::poll_fn;
use crate::sync::semaphore_ll::{AcquireError, Permit, Semaphore};
use crate::sync::batch_semaphore::{AcquireError, Permit, Semaphore};
use std::cell::UnsafeCell;
use std::ops;
use std::task::{Context, Poll};

#[cfg(not(loom))]
const MAX_READS: usize = 32;
Expand Down Expand Up @@ -114,12 +112,8 @@ struct ReleasingPermit<'a, T> {
}

impl<'a, T> ReleasingPermit<'a, T> {
fn poll_acquire(
&mut self,
cx: &mut Context<'_>,
s: &Semaphore,
) -> Poll<Result<(), AcquireError>> {
self.permit.poll_acquire(cx, self.num_permits, s)
async fn acquire(&mut self) -> Result<(), AcquireError> {
self.permit.acquire(self.num_permits, &self.lock.s).await
}
}

Expand Down Expand Up @@ -191,14 +185,11 @@ impl<T> RwLock<T> {
permit: Permit::new(),
lock: self,
};

poll_fn(|cx| permit.poll_acquire(cx, &self.s))
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
permit.acquire().await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
RwLockReadGuard { lock: self, permit }
}

Expand Down Expand Up @@ -231,13 +222,11 @@ impl<T> RwLock<T> {
lock: self,
};

poll_fn(|cx| permit.poll_acquire(cx, &self.s))
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
permit.acquire().await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});

RwLockWriteGuard { lock: self, permit }
}
Expand Down
15 changes: 6 additions & 9 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::semaphore_ll as ll; // low level implementation
use crate::future::poll_fn;
use super::batch_semaphore as ll; // low level implementation

/// Counting semaphore performing asynchronous permit aquisition.
///
Expand Down Expand Up @@ -56,14 +55,12 @@ impl Semaphore {

/// Acquires permit from the semaphore
pub async fn acquire(&self) -> SemaphorePermit<'_> {
let mut permit = SemaphorePermit {
let mut ll_permit = ll::Permit::new();
ll_permit.acquire(1, &self.ll_sem).await.unwrap();
SemaphorePermit {
sem: &self,
ll_permit: ll::Permit::new(),
};
poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem))
.await
.unwrap();
permit
ll_permit,
}
}

/// Tries to acquire a permit form the semaphore
Expand Down
1 change: 1 addition & 0 deletions tokio/src/sync/semaphore_ll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ impl Permit {
}

/// Returns `true` if the permit has been acquired
#[allow(dead_code)] // may be used later
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm usually against allowing dead_code, but in this case, I think it is a good choice. Reason: the entire semaphore_ll.rs file is planed to be removed.

pub(crate) fn is_acquired(&self) -> bool {
match self.state {
PermitState::Acquired(num) if num > 0 => true,
Expand Down
192 changes: 192 additions & 0 deletions tokio/src/sync/tests/loom_semaphore_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use crate::sync::batch_semaphore::*;

use futures::future::poll_fn;
use loom::future::block_on;
use loom::thread;
use std::future::Future;
use std::pin::Pin;
use loom::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
use std::task::Poll::Ready;
use std::task::{Context, Poll};

#[test]
fn basic_usage() {
const NUM: usize = 2;

struct Shared {
semaphore: Semaphore,
active: AtomicUsize,
}

async fn actor(shared: Arc<Shared>) {
let mut permit = Permit::new();
permit.acquire(1, &shared.semaphore).await.unwrap();
println!("acquired!");
hawkw marked this conversation as resolved.
Show resolved Hide resolved
let actual = shared.active.fetch_add(1, SeqCst);
assert!(actual <= NUM - 1);

let actual = shared.active.fetch_sub(1, SeqCst);
assert!(actual <= NUM);

permit.release(1, &shared.semaphore);
}

loom::model(|| {

let shared = Arc::new(Shared {
semaphore: Semaphore::new(NUM),
active: AtomicUsize::new(0),
});

for _ in 0..NUM {
let shared = shared.clone();

thread::spawn(move || {
block_on(actor(shared));
});
}

block_on(actor(shared));
});
}

#[test]
fn release() {
loom::model(|| {
let semaphore = Arc::new(Semaphore::new(1));

{
let semaphore = semaphore.clone();
thread::spawn(move || {
let mut permit = Permit::new();

block_on(permit.acquire(1, &semaphore)).unwrap();

permit.release(1, &semaphore);
});
}

let mut permit = Permit::new();

block_on(permit.acquire(1, &semaphore)).unwrap();

permit.release(1, &semaphore);
});
}

#[test]
fn basic_closing() {
const NUM: usize = 2;

loom::model(|| {
println!("-- iter --");
hawkw marked this conversation as resolved.
Show resolved Hide resolved
let semaphore = Arc::new(Semaphore::new(1));

for _ in 0..NUM {
let semaphore = semaphore.clone();

thread::spawn(move || {
let mut permit = Permit::new();

for _ in 0..2 {
block_on(permit.acquire(1, &semaphore)).map_err(|_|())?;

permit.release(1, &semaphore);
}

Ok::<(), ()>(())
});
}

semaphore.close();
});
}

#[test]
fn concurrent_close() {
const NUM: usize = 3;

loom::model(|| {
let semaphore = Arc::new(Semaphore::new(1));

for _ in 0..NUM {
let semaphore = semaphore.clone();

thread::spawn(move || {
let mut permit = Permit::new();

block_on(permit.acquire(1, &semaphore)).map_err(|_|())?;

permit.release(1, &semaphore);

semaphore.close();

Ok::<(), ()>(())
});
}
});
}

#[test]
fn batch() {
let mut b = loom::model::Builder::new();
b.preemption_bound = Some(1);

b.check(|| {
let semaphore = Arc::new(Semaphore::new(10));
let active = Arc::new(AtomicUsize::new(0));
let mut ths = vec![];

for _ in 0..2 {
let semaphore = semaphore.clone();
let active = active.clone();

ths.push(thread::spawn(move || {
let mut permit = Permit::new();

for n in &[4, 10, 8] {
block_on(permit.acquire(*n, &semaphore)).unwrap();

active.fetch_add(*n as usize, SeqCst);

let num_active = active.load(SeqCst);
assert!(num_active <= 10);

thread::yield_now();

active.fetch_sub(*n as usize, SeqCst);

permit.release(*n, &semaphore);
}
}));
}

for th in ths.into_iter() {
th.join().unwrap();
}

assert_eq!(10, semaphore.available_permits());
});
}

#[test]
fn release_during_acquire() {
loom::model(|| {
let semaphore = Arc::new(Semaphore::new(10));
let mut permit1 = Permit::new();
permit1.try_acquire(8, &semaphore).expect("try_acquire should succeed; semaphore uncontended");
let semaphore2 = semaphore.clone();
let thread = thread::spawn(move || {
let mut permit = Permit::new();
block_on(permit.acquire(4, &semaphore2)).unwrap();
permit
});

permit1.release(8, &semaphore);
let mut permit2 = thread.join().unwrap();
permit2.release(4, &semaphore);
assert_eq!(10, semaphore.available_permits());
})
}
2 changes: 2 additions & 0 deletions tokio/src/sync/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
cfg_not_loom! {
mod atomic_waker;
mod semaphore_ll;
mod semaphore_batch;
}

cfg_loom! {
Expand All @@ -10,5 +11,6 @@ cfg_loom! {
mod loom_mpsc;
mod loom_notify;
mod loom_oneshot;
mod loom_semaphore_batch;
mod loom_semaphore_ll;
}
Loading