Skip to content

Commit

Permalink
lint: fix lint-err (safety, split-unsafe, scoping)
Browse files Browse the repository at this point in the history
  • Loading branch information
kp-omer-shamash committed Feb 10, 2025
1 parent a431437 commit 8f49bde
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 107 deletions.
2 changes: 1 addition & 1 deletion lightway-app-utils/examples/udprelay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use self::channel::Channel;

use lightway_app_utils::IOUring;

use anyhow::{Error, Result};
use anyhow::Result;
use async_channel::{bounded, Receiver, Sender};
use bytes::BytesMut;
use clap::Parser;
Expand Down
254 changes: 148 additions & 106 deletions lightway-app-utils/src/iouring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ enum IOUringActionID {
}
const RX_BUFFER_GROUP: u16 = 0xdead;

/// Wrapper for raw pointer that guarantees Send + Sync safety
/// Safety: The underlying memory is owned by the Arc'd BufferPool and outlives any pointer usage
/// A wrapper around a raw pointer that guarantees thread safety through Arc ownership
struct BufferPtr(*mut u8);

#[allow(unsafe_code)]
// Safety: The pointer is owned by Arc<BufferPool> which ensures exclusive access
unsafe impl Send for BufferPtr {}
#[allow(unsafe_code)]
// Safety: The pointer is owned by Arc<BufferPool> which ensures synchronized access
unsafe impl Sync for BufferPtr {}

impl BufferPtr {
Expand All @@ -39,6 +41,7 @@ impl BufferPtr {
}
}

/// A pool of buffers with an underlying contiguous memory block
struct BufferPool {
data: Vec<u8>, // contiguous block of memory (all buffers)
lengths: Vec<AtomicUsize>,
Expand All @@ -62,6 +65,7 @@ impl BufferPool {
}

fn get_buffer(&self, idx: usize) -> (BufferPtr, &AtomicUsize, &AtomicBool) {
// Safety: Index is bounds-checked by the caller, and buffer_size ensures no overflow
#[allow(unsafe_code)]
let ptr = unsafe { self.data.as_ptr().add(idx * self.buffer_size) as *mut u8 };

Expand All @@ -81,6 +85,7 @@ pub struct IOUring<T: AsRawFd> {
submission_lock: Arc<Mutex<()>>,
}

// Safety: IOUring implementation does direct memory manipulations for performence benifits
#[allow(unsafe_code)]
impl<T: AsRawFd> IOUring<T> {
/// Create `IOUring` struct
Expand Down Expand Up @@ -113,45 +118,61 @@ impl<T: AsRawFd> IOUring<T> {

// We can provide the buffers without a lock, as we still havn't shared the ownership
let fd = owned_fd.as_raw_fd();
unsafe {
let mut sq = ring.submission_shared();
sq.push(
&opcode::ProvideBuffers::new(
rx_pool.data.as_ptr() as *mut u8,
mtu as i32,
ring_size as u16,
RX_BUFFER_GROUP,
0,
)
.build()
.user_data(IOUringActionID::RecycleBuffers as u64),
)?;
sq.push(
&opcode::RecvMulti::new(types::Fd(fd), RX_BUFFER_GROUP)

// Scope submission-queue operations to avoid borrowing ring
{
// Safety: Ring submission can be used without locks at this point
let mut sq = unsafe { ring.submission_shared() };

// Safety: Buffer memory is owned by rx_pool and outlives the usage
unsafe {
sq.push(
&opcode::ProvideBuffers::new(
rx_pool.data.as_ptr() as *mut u8,
mtu as i32,
ring_size as u16,
RX_BUFFER_GROUP,
0,
)
.build()
.user_data(IOUringActionID::ReceivedBuffer as u64),
)?;

// A bit ineffective vs. calculate offset directly, but more maintainable
let tx_iovecs: Vec<_> = (0..ring_size)
.map(|idx| {
let (ptr, _, _) = tx_pool.get_buffer(idx);
iovec {
iov_base: ptr.as_ptr() as *mut libc::c_void,
iov_len: mtu,
}
})
.collect();
ring.submitter().register_buffers(&tx_iovecs)?;
.user_data(IOUringActionID::RecycleBuffers as u64),
)?
};

// Safety: Ring is initialized and file descriptor is valid
unsafe {
sq.push(
&opcode::RecvMulti::new(types::Fd(fd), RX_BUFFER_GROUP)
.build()
.user_data(IOUringActionID::ReceivedBuffer as u64),
)?
};
}

let rx_pool_clone = rx_pool.clone();
let tx_pool_clone = tx_pool.clone();
let ring_clone = ring.clone();
let lock_clone = submission_lock.clone();
let notify_clone = rx_notify.clone();
let eventfd = rx_eventfd.as_raw_fd();
let provide_buffers = rx_provide_buffers.clone();
// A bit ineffective vs. calculate offset directly, but more maintainable
let tx_iovecs: Vec<_> = (0..ring_size)
.map(|idx| {
let (ptr, _, _) = tx_pool.get_buffer(idx);
iovec {
iov_base: ptr.as_ptr() as *mut libc::c_void,
iov_len: mtu,
}
})
.collect();

// Safety: tx_iovecs point to valid memory owned by tx_pool
unsafe { ring.submitter().register_buffers(&tx_iovecs)? };

let config = IOUringTaskConfig {
tun_fd: fd,
rx_pool: rx_pool.clone(),
tx_pool: tx_pool.clone(),
rx_notify: rx_notify.clone(),
rx_eventfd: rx_eventfd.as_raw_fd(),
rx_provide_buffers: rx_provide_buffers.clone(),
ring: ring.clone(),
submission_lock: submission_lock.clone(),
};

thread::Builder::new()
.name("io_uring-main".to_string())
Expand All @@ -160,16 +181,7 @@ impl<T: AsRawFd> IOUring<T> {
.enable_io()
.build()
.expect("Failed building Tokio Runtime")
.block_on(iouring_task(
fd,
rx_pool_clone,
tx_pool_clone,
notify_clone,
eventfd,
provide_buffers,
ring_clone,
lock_clone,
))
.block_on(iouring_task(config))
})?;

Ok(Self {
Expand Down Expand Up @@ -222,6 +234,7 @@ impl<T: AsRawFd> IOUring<T> {
return IOCallbackResult::WouldBlock;
}

// Safety: Buffer is allocated with sufficient size and ownership is checked via state
unsafe { std::slice::from_raw_parts_mut(buffer.as_ptr(), len).copy_from_slice(&buf) };
length.store(len, Ordering::Release);

Expand All @@ -238,11 +251,16 @@ impl<T: AsRawFd> IOUring<T> {
.user_data(idx as u64);

// Safely queue submission
let _guard = self.submission_lock.lock();
unsafe {
match self.ring.submission_shared().push(&write_op) {
Ok(_) => IOCallbackResult::Ok(len),
Err(_) => IOCallbackResult::WouldBlock,
{
let _guard = self.submission_lock.lock();
// Safety: protected by lock above
let mut sq = unsafe { self.ring.submission_shared() };
// Safety: entry uses buffers from rx_pool which outlive task using them
unsafe {
match sq.push(&write_op) {
Ok(_) => IOCallbackResult::Ok(len),
Err(_) => IOCallbackResult::WouldBlock,
}
}
}
}
Expand Down Expand Up @@ -282,6 +300,8 @@ impl<T: AsRawFd> IOUring<T> {
{
let len = length.load(Ordering::Acquire);
let mut new_buf = BytesMut::with_capacity(len);

// Safety: Buffer is allocated with sufficient size and ownership is checked via state
unsafe {
new_buf.extend_from_slice(std::slice::from_raw_parts(buffer.as_ptr(), len))
};
Expand All @@ -296,8 +316,9 @@ impl<T: AsRawFd> IOUring<T> {
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
let val = 1u64;
// Safety: buffer is defined on stack, event-fd outlives task using it
unsafe {
let val = 1u64;
if libc::write(
self.rx_eventfd.as_raw_fd(),
&val as *const u64 as *const _,
Expand All @@ -315,8 +336,8 @@ impl<T: AsRawFd> IOUring<T> {
}
}

#[allow(unsafe_code)]
async fn iouring_task(
/// Task variables
struct IOUringTaskConfig {
tun_fd: RawFd,
rx_pool: Arc<BufferPool>,
tx_pool: Arc<BufferPool>,
Expand All @@ -325,28 +346,38 @@ async fn iouring_task(
rx_provide_buffers: Arc<AtomicBool>,
ring: Arc<IoUring>,
submission_lock: Arc<Mutex<()>>,
) -> Result<()> {
}

// Safety: To manage ring completion and results effeciantly requires direct memory manipulations
#[allow(unsafe_code)]
async fn iouring_task(config: IOUringTaskConfig) -> Result<()> {
let mut eventfd_buf = [0u64; 1]; // Buffer for eventfd read (8 bytes)

// Submit initial read for eventfd (needs to be here for buffer to be on stack of the task)
unsafe {
let _guard = submission_lock.lock();
let mut sq = ring.submission_shared();
sq.push(
&opcode::Read::new(
types::Fd(rx_eventfd),
eventfd_buf.as_mut_ptr() as *mut u8,
8,
)
.build()
.user_data(IOUringActionID::RecyclePending as u64),
)?;
{
let _guard = config.submission_lock.lock();
// Safety: protected by above lock
let mut sq = unsafe { config.ring.submission_shared() };
// Safety: event-fd outlives the task, queue protected by lock
unsafe {
sq.push(
&opcode::Read::new(
types::Fd(config.rx_eventfd),
eventfd_buf.as_mut_ptr() as *mut u8,
8,
)
.build()
.user_data(IOUringActionID::RecyclePending as u64),
)?
};
}

loop {
ring.submit_and_wait(1)?;
// Work once we have at least 1 task to perform
config.ring.submit_and_wait(1)?;

for cqe in unsafe { ring.completion_shared() } {
// Safety: only task is using the completion-queue (concept should not change)
for cqe in unsafe { config.ring.completion_shared() } {
match cqe.user_data() {
x if x == IOUringActionID::RecycleBuffers as u64 => {
// Buffer provision completed
Expand All @@ -359,39 +390,50 @@ async fn iouring_task(
// NOTE: This approach is very good for cases we have constant data-flow
// we can only load the buffers for kernel when our read-threads are done with existing data,
// if our read-threads would block for too long elsewhere it would back-pressure the NIF device
let _guard = submission_lock.lock();
unsafe {
let mut sq = ring.submission_shared();

// Make sure kernel can use all buffers again
sq.push(
&opcode::ProvideBuffers::new(
rx_pool.data.as_ptr() as *mut u8,
rx_pool.buffer_size as i32,
rx_pool.states.len() as u16,
RX_BUFFER_GROUP,
0,
)
.build()
.user_data(IOUringActionID::RecycleBuffers as u64),
)?;

sq.push(
&opcode::RecvMulti::new(types::Fd(tun_fd), RX_BUFFER_GROUP)
let _guard = config.submission_lock.lock();

// Safety: protected by above lock
let mut sq = unsafe { config.ring.submission_shared() };

// Make sure kernel can use all buffers again
{
// Safety: buffers are mapped from rx_pool which outlives this task
unsafe {
sq.push(
&opcode::ProvideBuffers::new(
config.rx_pool.data.as_ptr() as *mut u8,
config.rx_pool.buffer_size as i32,
config.rx_pool.states.len() as u16,
RX_BUFFER_GROUP,
0,
)
.build()
.user_data(IOUringActionID::RecycleBuffers as u64),
)?
};
// Safety: buffer-group originates from rx_pool which outlives this task
unsafe {
sq.push(
&opcode::RecvMulti::new(
types::Fd(config.tun_fd),
RX_BUFFER_GROUP,
)
.build()
.user_data(IOUringActionID::ReceivedBuffer as u64),
)?;

// Resubmit eventfd read
sq.push(
&opcode::Read::new(
types::Fd(rx_eventfd),
eventfd_buf.as_mut_ptr() as *mut u8,
8,
)
.build()
.user_data(IOUringActionID::RecyclePending as u64),
)?;
)?
};
// Safety: Event-fd outlives the task, buffer is task-bound (stack)
unsafe {
sq.push(
&opcode::Read::new(
types::Fd(config.rx_eventfd),
eventfd_buf.as_mut_ptr() as *mut u8,
8,
)
.build()
.user_data(IOUringActionID::RecyclePending as u64),
)?
};
}
}
}
Expand All @@ -408,14 +450,14 @@ async fn iouring_task(
}

let buf_id = io_uring::cqueue::buffer_select(cqe.flags()).unwrap();
let (_, length, state) = rx_pool.get_buffer(buf_id as _);
let (_, length, state) = config.rx_pool.get_buffer(buf_id as _);

length.store(result as usize, Ordering::Release);
state.store(true, Ordering::Release); // Mark as ready-for-user
rx_notify.notify_waiters();
config.rx_notify.notify_waiters();

if !io_uring::cqueue::more(cqe.flags()) {
rx_provide_buffers.store(true, Ordering::Release);
config.rx_provide_buffers.store(true, Ordering::Release);
}
}

Expand All @@ -429,7 +471,7 @@ async fn iouring_task(
);
metrics::tun_iouring_tx_err();
}
let (_, _, state) = tx_pool.get_buffer(idx as _);
let (_, _, state) = config.tx_pool.get_buffer(idx as _);
state.store(false, Ordering::Release); // mark as available for send
}
}
Expand Down

0 comments on commit 8f49bde

Please sign in to comment.