Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: cancel all in-flight ops on driver drop #158

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ mod write;

mod writev;

use crate::driver::op::Lifecycle;
use io_uring::opcode::AsyncCancel;
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
use io_uring::IoUring;
use slab::Slab;
use std::io;
Expand Down Expand Up @@ -75,6 +77,8 @@ impl Driver {
self.uring.submit_and_wait(1)
}

// only used in tests rn
#[allow(unused)]
fn num_operations(&self) -> usize {
self.ops.lifecycle.len()
}
Expand Down Expand Up @@ -107,9 +111,10 @@ impl Driver {
Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => {
self.tick();
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
}
Err(e) => {
Err(e) if e.raw_os_error() != Some(libc::EINTR) => {
return Err(e);
}
_ => continue,
}
}
}
Expand All @@ -121,9 +126,54 @@ impl AsRawFd for Driver {
}
}

/// Drop the driver, cancelling any in-progress ops and waiting for them to terminate.
///
/// This first cancels all ops and then waits for them to be moved to the completed lifecycle phase.
///
/// It is possible for this to be run without previously dropping the runtime, but this should only
/// be possible in the case of [`std::process::exit`].
///
/// This depends on us knowing when ops are completed and done firing.
/// When multishot ops are added (support exists but none are implemented), a way to know if such
/// an op is finished MUST be added, otherwise our shutdown process is unsound.
impl Drop for Driver {
fn drop(&mut self) {
while self.num_operations() > 0 {
// get all ops in flight for cancellation
while !self.uring.submission().is_empty() {
self.submit().expect("Internal error when dropping driver");
}

// pre-determine what to cancel
let mut cancellable_ops = Vec::new();
for (id, cycle) in self.ops.lifecycle.iter() {
// don't cancel completed items
if !matches!(cycle, Lifecycle::Completed(_)) {
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
cancellable_ops.push(id);
}
}

// cancel all ops
for id in cancellable_ops {
unsafe {
while self
.uring
.submission()
.push(&AsyncCancel::new(id as u64).build().user_data(u64::MAX))
.is_err()
{
self.submit().expect("Internal error when dropping driver");
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// TODO: add a way to know if a multishot op is done sending completions
// SAFETY: this is currently unsound for multishot ops
while !self
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
.ops
.lifecycle
.iter()
.all(|(_, cycle)| matches!(cycle, Lifecycle::Completed(_)))
{
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
// If waiting fails, ignore the error. The wait will be attempted
// again on the next loop.
let _ = self.wait();
Expand Down Expand Up @@ -167,7 +217,9 @@ impl Ops {

impl Drop for Ops {
fn drop(&mut self) {
assert!(self.lifecycle.is_empty());
assert!(self.completions.is_empty());
assert!(self
Noah-Kennedy marked this conversation as resolved.
Show resolved Hide resolved
.lifecycle
.iter()
.all(|(_, cycle)| matches!(cycle, Lifecycle::Completed(_))))
}
}