Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(runtime): make Attacher a container #154

Merged
merged 9 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions compio-driver/src/fusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ mod iour;

pub(crate) mod op;

pub use iour::OpCode as IourOpCode;
pub use poll::OpCode as PollOpCode;
#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{io, task::Poll, time::Duration};

pub use driver_type::DriverType;
pub use iour::OpCode as IourOpCode;
pub(crate) use iour::{sockaddr_storage, socklen_t};
pub use poll::Decision;
pub use poll::{Decision, OpCode as PollOpCode};
use slab::Slab;

pub(crate) use crate::unix::RawOp;
Expand All @@ -34,7 +33,7 @@ mod driver_type {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DriverType {
/// Using `polling` driver
Poll = POLLING,
Poll = POLLING,

/// Using `io-uring` driver
IoUring = IO_URING,
Expand Down
9 changes: 3 additions & 6 deletions compio-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
# Workspace dependencies
compio-buf = { workspace = true, optional = true }
compio-buf = { workspace = true }
compio-driver = { workspace = true }
compio-io = { workspace = true, optional = true }
compio-runtime = { workspace = true, optional = true }
compio-io = { workspace = true }
compio-runtime = { workspace = true }

# Windows specific dependencies
[target.'cfg(windows)'.dependencies]
Expand Down Expand Up @@ -55,6 +55,3 @@ windows-sys = { workspace = true, features = ["Win32_Security_Authorization"] }
# Unix specific dev dependencies
[target.'cfg(unix)'.dev-dependencies]
nix = { workspace = true, features = ["fs"] }

[features]
runtime = ["dep:compio-buf", "dep:compio-io", "dep:compio-runtime"]
95 changes: 24 additions & 71 deletions compio-fs/src/file.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use std::{fs::Metadata, io};
use std::{fs::Metadata, future::Future, io, mem::ManuallyDrop, path::Path};

use compio_driver::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
#[cfg(feature = "runtime")]
use {
crate::OpenOptions,
compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut},
compio_driver::op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt},
compio_io::{AsyncReadAt, AsyncWriteAt},
compio_runtime::{Attachable, Attacher, Runtime},
std::{future::Future, mem::ManuallyDrop, path::Path},
use compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut};
use compio_driver::op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt};
use compio_io::{AsyncReadAt, AsyncWriteAt};
use compio_runtime::{
impl_attachable, impl_try_as_raw_fd, Attacher, Runtime, TryAsRawFd, TryClone,
};
#[cfg(all(feature = "runtime", unix))]
#[cfg(unix)]
use {
compio_buf::{IoVectoredBuf, IoVectoredBufMut},
compio_driver::op::{ReadVectoredAt, WriteVectoredAt},
};

use crate::OpenOptions;

/// A reference to an open file on the filesystem.
///
/// An instance of a `File` can be read and/or written depending on what options
Expand All @@ -24,16 +22,13 @@ use {
/// required to specify an offset when issuing an operation.
#[derive(Debug)]
pub struct File {
inner: std::fs::File,
#[cfg(feature = "runtime")]
attacher: Attacher,
inner: Attacher<std::fs::File>,
}

impl File {
/// Attempts to open a file in read-only mode.
///
/// See the [`OpenOptions::open`] method for more details.
#[cfg(feature = "runtime")]
pub async fn open(path: impl AsRef<Path>) -> io::Result<Self> {
OpenOptions::new().read(true).open(path).await
}
Expand All @@ -44,7 +39,6 @@ impl File {
/// and will truncate it if it does.
///
/// See the [`OpenOptions::open`] function for more details.
#[cfg(feature = "runtime")]
pub async fn create(path: impl AsRef<Path>) -> io::Result<Self> {
OpenOptions::new()
.create(true)
Expand All @@ -56,14 +50,13 @@ impl File {

/// Close the file. If the returned future is dropped before polling, the
/// file won't be closed.
#[cfg(feature = "runtime")]
pub fn close(self) -> impl Future<Output = io::Result<()>> {
// Make sure that self won't be dropped after `close` called.
// Users may call this method and drop the future immediately. In that way the
// `close` should be cancelled.
let this = ManuallyDrop::new(self);
async move {
let op = CloseFile::new(this.as_raw_fd());
let op = CloseFile::new(this.inner.try_as_raw_fd()?);
Runtime::current().submit(op).await.0?;
Ok(())
}
Expand All @@ -75,22 +68,16 @@ impl File {
/// It does not clear the attach state.
pub fn try_clone(&self) -> io::Result<Self> {
let inner = self.inner.try_clone()?;
Ok(Self {
#[cfg(feature = "runtime")]
attacher: self.attacher.try_clone(&inner)?,
inner,
})
Ok(Self { inner })
}

/// Queries metadata about the underlying file.
pub fn metadata(&self) -> io::Result<Metadata> {
self.inner.metadata()
unsafe { self.inner.get_unchecked() }.metadata()
}

#[cfg(feature = "runtime")]
async fn sync_impl(&self, datasync: bool) -> io::Result<()> {
self.attach()?;
let op = Sync::new(self.as_raw_fd(), datasync);
let op = Sync::new(self.try_as_raw_fd()?, datasync);
Runtime::current().submit(op).await.0?;
Ok(())
}
Expand All @@ -99,7 +86,6 @@ impl File {
///
/// This function will attempt to ensure that all in-memory data reaches the
/// filesystem before returning.
#[cfg(feature = "runtime")]
pub async fn sync_all(&self) -> io::Result<()> {
self.sync_impl(false).await
}
Expand All @@ -115,17 +101,15 @@ impl File {
/// [`sync_all`].
///
/// [`sync_all`]: File::sync_all
#[cfg(feature = "runtime")]
pub async fn sync_data(&self) -> io::Result<()> {
self.sync_impl(true).await
}
}

#[cfg(feature = "runtime")]
impl AsyncReadAt for File {
async fn read_at<T: IoBufMut>(&self, buffer: T, pos: u64) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = ReadAt::new(self.as_raw_fd(), pos, buffer);
let (fd, buffer) = buf_try!(self.try_as_raw_fd(), buffer);
let op = ReadAt::new(fd, pos, buffer);
Runtime::current()
.submit(op)
.await
Expand All @@ -139,8 +123,8 @@ impl AsyncReadAt for File {
buffer: T,
pos: u64,
) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = ReadVectoredAt::new(self.as_raw_fd(), pos, buffer);
let (fd, buffer) = buf_try!(self.try_as_raw_fd(), buffer);
let op = ReadVectoredAt::new(fd, pos, buffer);
Runtime::current()
.submit(op)
.await
Expand All @@ -149,7 +133,6 @@ impl AsyncReadAt for File {
}
}

#[cfg(feature = "runtime")]
impl AsyncWriteAt for File {
#[inline]
async fn write_at<T: IoBuf>(&mut self, buf: T, pos: u64) -> BufResult<usize, T> {
Expand All @@ -167,11 +150,10 @@ impl AsyncWriteAt for File {
}
}

#[cfg(feature = "runtime")]
impl AsyncWriteAt for &File {
async fn write_at<T: IoBuf>(&mut self, buffer: T, pos: u64) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = WriteAt::new(self.as_raw_fd(), pos, buffer);
let (fd, buffer) = buf_try!(self.try_as_raw_fd(), buffer);
let op = WriteAt::new(fd, pos, buffer);
Runtime::current().submit(op).await.into_inner()
}

Expand All @@ -181,41 +163,12 @@ impl AsyncWriteAt for &File {
buffer: T,
pos: u64,
) -> BufResult<usize, T> {
let ((), buffer) = buf_try!(self.attach(), buffer);
let op = WriteVectoredAt::new(self.as_raw_fd(), pos, buffer);
let (fd, buffer) = buf_try!(self.try_as_raw_fd(), buffer);
let op = WriteVectoredAt::new(fd, pos, buffer);
Runtime::current().submit(op).await.into_inner()
}
}

impl AsRawFd for File {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}

impl FromRawFd for File {
unsafe fn from_raw_fd(fd: RawFd) -> Self {
Self {
inner: FromRawFd::from_raw_fd(fd),
#[cfg(feature = "runtime")]
attacher: compio_runtime::Attacher::new(),
}
}
}
impl_try_as_raw_fd!(File, inner);

impl IntoRawFd for File {
fn into_raw_fd(self) -> RawFd {
self.inner.into_raw_fd()
}
}

#[cfg(feature = "runtime")]
impl Attachable for File {
fn attach(&self) -> io::Result<()> {
self.attacher.attach(self)
}

fn is_attached(&self) -> bool {
self.attacher.is_attached()
}
}
impl_attachable!(File, inner);
2 changes: 0 additions & 2 deletions compio-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
mod file;
pub use file::*;

#[cfg(feature = "runtime")]
mod open_options;
#[cfg(feature = "runtime")]
pub use open_options::*;

#[cfg(windows)]
Expand Down
Loading