Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP, UDP and Unix socket support #40

Merged
merged 16 commits into from
Feb 25, 2022
Merged

Conversation

LinkTed
Copy link
Contributor

@LinkTed LinkTed commented Aug 7, 2021

Add TCP, UDP and Unix socket support.

  • TCP
    • Stream
    • Listener
  • UDP socket
  • Unix
    • Stream
    • Listener
    • Datagram

Open to feedback

Sorry, something went wrong.

@LinkTed LinkTed marked this pull request as draft August 7, 2021 10:31
Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing these ops as async fns like this exposes some issues w/ cancellation. I have a proposal here: https://github.com/tokio-rs/tokio-uring/blob/design-doc/DESIGN.md#byte-streams. You interested in trying that?

}

impl TcpStream {
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should just take a SocketAddr here and avoid the lookup_host. A user could call the Tokio lookup_host themselves?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this would be better. It will reduce the complexity and make it clear which IP address the peer is connected.


let op = opcode::Connect::new(
types::Fd(connect.socket.as_raw_fd()),
connect.os_socket_addr.as_ptr(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The socket address will need to be stored on the heap to be safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, did not push the current. Yes, there was a use-after free. Maybe we should use a pin instead of Box<T> to not allocated memory on the heap (performance reason)?

@LinkTed
Copy link
Contributor Author

LinkTed commented Aug 10, 2021

Exposing these ops as async fns like this exposes some issues w/ cancellation. I have a proposal here: https://github.com/tokio-rs/tokio-uring/blob/design-doc/DESIGN.md#byte-streams. You interested in trying that?

Thanks for the feedback. Sure, I will try it on the weekend.

@LinkTed
Copy link
Contributor Author

LinkTed commented Aug 11, 2021

Exposing these ops as async fns like this exposes some issues w/ cancellation. I have a proposal here: https://github.com/tokio-rs/tokio-uring/blob/design-doc/DESIGN.md#byte-streams. You interested in trying that?
@carllerche
I have some questions about your "proposal":

  • Should the AsyncBufRead own the buffer or the driver? If yes, then what should happen if the AsyncBufRead drop then?
  • Should the Write and Read operation be kept or removed?
  • How big should the internal buffer of the AsyncBufRead be?

@johalun
Copy link

johalun commented Oct 19, 2021

Any progress on this lately? I'm very interesting in trying it out. Can test if conflicts are resolved.

@LinkTed
Copy link
Contributor Author

LinkTed commented Oct 21, 2021

@johalun Did not have to work on it, sorry.

@LinkTed
Copy link
Contributor Author

LinkTed commented Nov 1, 2021

@johalun I continue working on this PR. Currently, there is a bug in the function accept of the TcpListener. The addrlen is not set (https://github.com/tokio-rs/tokio-uring/pull/40/files#diff-b0bd49be618d67cc160c826497fb99a731ecd1ddbefa6e09b8d6c6439981114cR25). If this is fixed then the TCP code can be tested.

Maybe somebody knows why.

@Darksonn
Copy link

Sorry, I don't know.

@dyxushuai
Copy link

The accpet doc says that

       The addrlen argument is a value-result argument: the caller must
       initialize it to contain the size (in bytes) of the structure
       pointed to by addr; on return it will contain the actual size of
       the peer address.

@LinkTed
Copy link
Contributor Author

LinkTed commented Nov 28, 2021

The accpet doc says that

       The addrlen argument is a value-result argument: the caller must
       initialize it to contain the size (in bytes) of the structure
       pointed to by addr; on return it will contain the actual size of
       the peer address.

Yes, I know. I use the addrlen as a value-result argument. But what I am doing wrong?

@TiltMeSenpai
Copy link

I did a bit of digging, what I think might be happening is Rust is actually copying the Accept struct with sockaddr_storage and addrlen before submitting it to the io_uring queue.

I changed driver/accept.rs so that the Accept struct takes a sockaddr_storage/addrlen pointer instead of holding them itself, and updated driver/socket.rs so that the accept() function owns sockaddr_storage and addrlen. After the Accept future returns, sockaddr_storage and addrlen in accept() are updated with the correct values, however the values in completion.data remain unchanged for some reason.

This is probably a pretty hacky solution, but it does confirm my theory about Rust making random copies of your structs. I hope this helps.

diff --git a/src/driver/accept.rs b/src/driver/accept.rs
index c90db9d..a18a8da 100644
--- a/src/driver/accept.rs
+++ b/src/driver/accept.rs
@@ -4,25 +4,25 @@ use std::io;
 
 pub(crate) struct Accept {
     fd: SharedFd,
-    pub(crate) sockaddr_storage: libc::sockaddr_storage,
-    pub(crate) addrlen: libc::socklen_t,
+    pub(crate) sockaddr_storage: *mut libc::sockaddr_storage,
+    pub(crate) addrlen: *mut libc::socklen_t,
 }
 
 impl Op<Accept> {
-    pub(crate) fn accept(fd: &SharedFd) -> io::Result<Op<Accept>> {
+    pub(crate) fn accept(fd: &SharedFd, sockaddr: &mut libc::sockaddr_storage, addrlen: &mut libc::socklen_t) -> io::Result<Op<Accept>> {
         use io_uring::{opcode, types};
 
         Op::submit_with(
             Accept {
                 fd: fd.clone(),
-                sockaddr_storage: unsafe { std::mem::zeroed() },
-                addrlen: std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t,
+                sockaddr_storage: sockaddr,
+                addrlen: addrlen,
             },
             |accept| {
                 opcode::Accept::new(
                     types::Fd(accept.fd.raw_fd()),
-                    &mut accept.sockaddr_storage as *mut _ as *mut _,
-                    &mut accept.addrlen,
+                    accept.sockaddr_storage as *mut _,
+                    accept.addrlen,
                 )
                 .flags(libc::O_CLOEXEC)
                 .build()
diff --git a/src/driver/socket.rs b/src/driver/socket.rs
index 60f80cb..cfad2d4 100644
--- a/src/driver/socket.rs
+++ b/src/driver/socket.rs
@@ -39,15 +39,17 @@ impl Socket {
     }
 
     pub(crate) async fn accept(&self) -> io::Result<(Socket, SocketAddr)> {
-        let op = Op::accept(&self.fd)?;
+        let mut sockaddr: libc::sockaddr_storage = unsafe { std::mem::zeroed() };
+        let mut addrlen:  libc::socklen_t = std::mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
+        let op = Op::accept(&self.fd, &mut sockaddr, &mut addrlen)?;
         let completion = op.await;
         let fd = completion.result?;
         let fd = SharedFd::new(fd as i32);
         let socket = Socket { fd };
         let os_socket_addr = unsafe {
             OsSocketAddr::from_raw_parts(
-                &completion.data.sockaddr_storage as *const _ as _,
-                completion.data.addrlen as usize,
+                &sockaddr as *const _ as _,
+                addrlen as usize,
             )
         };
         let socket_addr = os_socket_addr.into_addr().unwrap();

@LinkTed
Copy link
Contributor Author

LinkTed commented Dec 12, 2021

@TiltMeSenpai Thanks for the patch. Yes, it helps. I will check, how we can fix this.

@LinkTed
Copy link
Contributor Author

LinkTed commented Dec 18, 2021

@TiltMeSenpai, yes it was the bug. I fix it by using an Box.

Currently, I am working to add the UDP support.

@LinkTed LinkTed force-pushed the master branch 3 times, most recently from 6655724 to 23c5a71 Compare December 22, 2021 17:55
@LinkTed
Copy link
Contributor Author

LinkTed commented Dec 22, 2021

I will add the Unix support in another PR because I think that this PR would be too big. Therefore, I will clean the current state and remove the draft state, so that the review process can start.

@Noah-Kennedy
Copy link
Contributor

I'll review this once you are ready. I think it would be great to get this merged in before I make a new release.

Add the `TcpStream`, `TcpListener` and `UdpSocket` structs.
@LinkTed LinkTed marked this pull request as ready for review December 23, 2021 15:22
@Noah-Kennedy
Copy link
Contributor

Noah-Kennedy commented Jan 6, 2022

If we don't need a bind operation, can we delete the file?

src/driver/op.rs Outdated
@@ -21,6 +21,7 @@ pub(crate) struct Op<T: 'static> {
}

/// Operation completion. Returns stored state with the result of the operation.
#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What dead code are we allowing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not add these sorts of statements then.

@Noah-Kennedy
Copy link
Contributor

@carllerche can you give this a re-review?

Noah-Kennedy and others added 4 commits February 13, 2022 16:21

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
use std::env;

use tokio_uring::{fs::File, net::TcpListener};

Copy link

@inevity inevity Feb 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should create a new test ,this test is for tokio type compatile consideration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mix.rs example jus show that tokio-uring runtime can use the tokio type from tokio crate,such as tokio::net and tokio::spawn.

@@ -146,7 +145,7 @@ where
Poll::Ready(Completion {
data: me.data.take().expect("unexpected operation state"),
result,
flags,
flags: flags,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add this back once we decide to use it. For now, let's not.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just lint waring.

@@ -63,7 +62,7 @@ impl<T> Op<T> {
/// the kernel.
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Op<T>>
where
F: FnOnce() -> squeue::Entry,
F: FnOnce(&mut T) -> squeue::Entry,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why mut?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because submitting certain ops might mutate the stored data.

@inevity
Copy link

inevity commented Feb 17, 2022

Since we just use the old tokio current runtime to impl the tokio_uring runtime, So is there any benefit from using the tokio:uring TCP socket support? -------> replace the mio and tokio:spawn/await with tokio_uring::driver(get accept and place fd in the epoll) and uring sumbit?

@Noah-Kennedy
Copy link
Contributor

@inevity could you please rephrase that statement?

@inevity
Copy link

inevity commented Feb 18, 2022

@inevity could you please rephrase that statement?

just omit. It maybe have perf improvement.

@Noah-Kennedy Noah-Kennedy merged commit 523dae0 into tokio-rs:master Feb 25, 2022
@jon-chuang
Copy link
Contributor

Has this been tested with Hyper with HTTP/2 and its h2 backend?

Its worth noting that monoio's io_uring-based TcpStream is not compatible as a plugin to h2. See https://github.com/bytedance/monoio/tree/master/monoio-compat

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants