Skip to content

Commit

Permalink
bugfix: Handle interrupts while polling
Browse files Browse the repository at this point in the history
Previous, `Poller::wait` would bubble signal interruption error to the user.
However, this may be unexpected for simple use cases. Thus, this commit makes
it so, if `ErrorKind::Interrupted` is received by the underlying `wait()` call,
it clears the events and tries to wait again.

This also adds a test for this interruption written by @psychon.

Co-Authored-By: Uli Schlachter <psychon@users.noreply.github.com>
Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull and psychon authored Oct 27, 2023
1 parent 0575cbd commit b9ab821
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 9 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ features = [
[dev-dependencies]
easy-parallel = "3.1.0"
fastrand = "2.0.0"

[target.'cfg(unix)'.dev-dependencies]
libc = "0.2"
signal-hook = "0.3.17"
34 changes: 25 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::time::{Duration, Instant};

use cfg_if::cfg_if;

Expand Down Expand Up @@ -651,14 +651,30 @@ impl Poller {
let _enter = span.enter();

if let Ok(_lock) = self.lock.try_lock() {
// Wait for I/O events.
self.poller.wait(&mut events.events, timeout)?;

// Clear the notification, if any.
self.notified.swap(false, Ordering::SeqCst);

// Indicate number of events.
Ok(events.len())
let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout));

loop {
// Figure out how long to wait for.
let timeout =
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));

// Wait for I/O events.
if let Err(e) = self.poller.wait(&mut events.events, timeout) {
// If the wait was interrupted by a signal, clear events and try again.
if e.kind() == io::ErrorKind::Interrupted {
events.clear();
continue;
} else {
return Err(e);
}
}

// Clear the notification, if any.
self.notified.swap(false, Ordering::SeqCst);

// Indicate number of events.
return Ok(events.len());
}
} else {
tracing::trace!("wait: skipping because another thread is already waiting on I/O");
Ok(0)
Expand Down
47 changes: 47 additions & 0 deletions tests/concurrent_modification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,53 @@ fn concurrent_modify() -> io::Result<()> {
Ok(())
}

#[cfg(unix)]
#[test]
fn concurrent_interruption() -> io::Result<()> {
struct MakeItSend<T>(T);
unsafe impl<T> Send for MakeItSend<T> {}

let (reader, _writer) = tcp_pair()?;
let poller = Poller::new()?;
unsafe {
poller.add(&reader, Event::none(0))?;
}

let mut events = Events::new();
let events_borrow = &mut events;
let (sender, receiver) = std::sync::mpsc::channel();

Parallel::new()
.add(move || {
// Register a signal handler so that the syscall is actually interrupted. A signal that
// is ignored by default does not cause an interrupted syscall.
signal_hook::flag::register(signal_hook::consts::signal::SIGURG, Default::default())?;

// Signal to the other thread how to send a signal to us
sender
.send(MakeItSend(unsafe { libc::pthread_self() }))
.unwrap();

poller.wait(events_borrow, Some(Duration::from_secs(1)))?;
Ok(())
})
.add(move || {
let MakeItSend(target_thread) = receiver.recv().unwrap();
thread::sleep(Duration::from_millis(100));
assert_eq!(0, unsafe {
libc::pthread_kill(target_thread, libc::SIGURG)
});
Ok(())
})
.run()
.into_iter()
.collect::<io::Result<()>>()?;

assert_eq!(events.len(), 0);

Ok(())
}

fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let a = TcpStream::connect(listener.local_addr()?)?;
Expand Down

0 comments on commit b9ab821

Please sign in to comment.