Skip to content

Commit

Permalink
Update.
Browse files Browse the repository at this point in the history
  • Loading branch information
xOS committed Apr 24, 2022
1 parent 321dda8 commit cc450a6
Show file tree
Hide file tree
Showing 22 changed files with 698 additions and 469 deletions.
19 changes: 14 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
[package]
name = "realm"
version = "2.1.4"
version = "2.1.5"
authors = ["zhboner <zhboner@gmail.com>"]
edition = "2021"

[workspace]
members = ["realm_io"]

[lib]
name = "realm"
path = "src/lib.rs"
Expand All @@ -13,6 +16,11 @@ name = "realm"
path = "src/bin.rs"

[dependencies]
# realm
realm_io = { path = "realm_io" }
kaminari = { version = "0.7.0", optional = true }

# common
cfg-if = "1"
futures = "0.3"
log = "0.4"
Expand All @@ -31,10 +39,6 @@ trust-dns-resolver = { version = "0.20", optional = true }
pin-project = "1"
lazy_static = "1"

# transport
kaminari = { version = "0.6.0", optional = true }


# tfo
tokio-tfo = { git = "https://github.com/zephyrchien/tokio-tfo", branch = "main", version = "0.1.9", optional = true }

Expand All @@ -55,7 +59,6 @@ jemallocator = {version = "0.3", optional = true }
[target.'cfg(unix)'.dependencies]
daemonize = "0.4"


[profile.release]
opt-level = 3
lto = true
Expand Down
18 changes: 18 additions & 0 deletions realm_io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "realm_io"
version = "0.2.2"
authors = ["zephyr <i@zephyr.moe>"]
description = "Realm's high performance IO collections"
repository = "https://github.com/zhboner/realm/realm_io"
readme = "README.md"
documentation = "https://docs.rs/realm_io"
keywords = ["network", "zero-copy", "relay"]
edition = "2021"
license = "MIT"

[dependencies]
libc = "0.2"
tokio = "1.9"

[target.'cfg(unix)'.dependencies]
tokio = { version = "1.9", features = ["net"] }
3 changes: 3 additions & 0 deletions realm_io/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Realm IO

Realm's high performance IO collections.
162 changes: 162 additions & 0 deletions realm_io/src/bidi_copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use std::future::Future;

use tokio::io::{AsyncRead, AsyncWrite};

use super::{AsyncIOBuf, CopyBuffer};

enum TransferState<B, SR, SW> {
Running(CopyBuffer<B, SR, SW>),
ShuttingDown,
Done,
}

fn transfer<B, SL, SR>(
cx: &mut Context<'_>,
state: &mut TransferState<B, SL, SR>,
r: &mut <CopyBuffer<B, SL, SR> as AsyncIOBuf>::StreamR,
w: &mut <CopyBuffer<B, SL, SR> as AsyncIOBuf>::StreamW,
amt: &mut u64,
) -> Poll<Result<()>>
where
B: Unpin,
SL: AsyncRead + AsyncWrite + Unpin,
SR: AsyncRead + AsyncWrite + Unpin,
CopyBuffer<B, SL, SR>: AsyncIOBuf,
CopyBuffer<B, SR, SL>: AsyncIOBuf,
{
loop {
match state {
TransferState::Running(buf) => {
ready!(buf.poll_copy(cx, r, w, amt))?;

*state = TransferState::ShuttingDown;
}
TransferState::ShuttingDown => {
ready!(Pin::new(&mut *w).poll_shutdown(cx))?;

*state = TransferState::Done;
}
TransferState::Done => return Poll::Ready(Ok(())),
}
}
}

fn transfer2<B, SL, SR>(
cx: &mut Context<'_>,
state: &mut TransferState<B, SR, SL>, // reverse
r: &mut <CopyBuffer<B, SL, SR> as AsyncIOBuf>::StreamW,
w: &mut <CopyBuffer<B, SL, SR> as AsyncIOBuf>::StreamR,
amt: &mut u64,
) -> Poll<Result<()>>
where
B: Unpin,
SL: AsyncRead + AsyncWrite + Unpin,
SR: AsyncRead + AsyncWrite + Unpin,
CopyBuffer<B, SL, SR>: AsyncIOBuf,
CopyBuffer<B, SR, SL>: AsyncIOBuf,
{
// type equality constraints will save this (one day)!
let r: &mut <CopyBuffer<B, SR, SL> as AsyncIOBuf>::StreamR =
unsafe { std::mem::transmute(r) };
let w: &mut <CopyBuffer<B, SR, SL> as AsyncIOBuf>::StreamW =
unsafe { std::mem::transmute(w) };
loop {
match state {
TransferState::Running(buf) => {
ready!(buf.poll_copy(cx, r, w, amt))?;

*state = TransferState::ShuttingDown;
}
TransferState::ShuttingDown => {
ready!(Pin::new(&mut *w).poll_shutdown(cx))?;

*state = TransferState::Done;
}
TransferState::Done => return Poll::Ready(Ok(())),
}
}
}

struct BidiCopy<'a, B, SL, SR>
where
B: Unpin,
SL: AsyncRead + AsyncWrite + Unpin,
SR: AsyncRead + AsyncWrite + Unpin,
CopyBuffer<B, SL, SR>: AsyncIOBuf + Unpin,
CopyBuffer<B, SR, SL>: AsyncIOBuf + Unpin,
{
a: &'a mut <CopyBuffer<B, SL, SR> as AsyncIOBuf>::StreamR,
b: &'a mut <CopyBuffer<B, SL, SR> as AsyncIOBuf>::StreamW,
a_to_b: TransferState<B, SL, SR>,
b_to_a: TransferState<B, SR, SL>,
ab_amt: &'a mut u64,
ba_amt: &'a mut u64,
}

impl<'a, B, SL, SR> Future for BidiCopy<'a, B, SL, SR>
where
B: Unpin,
SL: AsyncRead + AsyncWrite + Unpin,
SR: AsyncRead + AsyncWrite + Unpin,
CopyBuffer<B, SL, SR>: AsyncIOBuf,
CopyBuffer<B, SR, SL>: AsyncIOBuf,
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Unpack self into mut refs to each field to avoid borrow check issues.
let BidiCopy {
a,
b,
a_to_b,
b_to_a,
ab_amt,
ba_amt,
} = self.get_mut();

let a_to_b = transfer(cx, a_to_b, a, b, ab_amt)?;
let b_to_a = transfer2::<B, SL, SR>(cx, b_to_a, b, a, ba_amt)?;

// It is not a problem if ready! returns early because transfer_one_direction for the
// other direction will keep returning TransferState::Done(count) in future calls to poll
ready!(a_to_b);
ready!(b_to_a);

Poll::Ready(Ok(()))
}
}

pub async fn bidi_copy_buf<B, SR, SW>(
a: &mut <CopyBuffer<B, SR, SW> as AsyncIOBuf>::StreamR,
b: &mut <CopyBuffer<B, SR, SW> as AsyncIOBuf>::StreamW,
a_to_b_buf: CopyBuffer<B, SR, SW>,
b_to_a_buf: CopyBuffer<B, SW, SR>,
) -> (Result<()>, u64, u64)
where
B: Unpin,
SR: AsyncRead + AsyncWrite + Unpin,
SW: AsyncRead + AsyncWrite + Unpin,
CopyBuffer<B, SR, SW>: AsyncIOBuf + Unpin,
CopyBuffer<B, SW, SR>: AsyncIOBuf + Unpin,
{
let a_to_b = TransferState::Running(a_to_b_buf);
let b_to_a = TransferState::Running(b_to_a_buf);

let mut ab_amt = 0;
let mut ba_amt = 0;

let res = BidiCopy {
a,
b,
a_to_b,
b_to_a,
ab_amt: &mut ab_amt,
ba_amt: &mut ba_amt,
}
.await;

(res, ab_amt, ba_amt)
}
Loading

0 comments on commit cc450a6

Please sign in to comment.