From 97a77fa5aa887a762e1009a85d7e0cc67f859e6b Mon Sep 17 00:00:00 2001 From: oyyd Date: Sat, 13 Jan 2024 19:16:03 +0800 Subject: [PATCH] feat: update docs and example --- Cargo.lock | 17 +++++++++++ Cargo.toml | 11 +++++++ README.md | 61 +++++++-------------------------------- examples/client.rs | 23 +++++++++++++++ examples/server.rs | 28 ++++++++++++++++++ src/config.rs | 23 ++++++++------- src/frame.rs | 14 +++++---- src/lib.rs | 6 ++-- src/read_frame_grouper.rs | 3 +- src/session.rs | 12 ++++---- src/session_inner.rs | 4 +-- src/stream.rs | 19 +++++++++--- 12 files changed, 136 insertions(+), 85 deletions(-) create mode 100644 examples/client.rs create mode 100644 examples/server.rs diff --git a/Cargo.lock b/Cargo.lock index 5b9d2ec..c116ca6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,6 +96,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + [[package]] name = "libc" version = "0.2.152" @@ -144,6 +150,16 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.32.2" @@ -274,6 +290,7 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", "pin-project-lite", "socket2", "tokio-macros", diff --git a/Cargo.toml b/Cargo.toml index 8a85c61..fcee508 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,3 +25,14 @@ tokio = { version = "1.35", features = [ ] } dashmap = "5.5" log = "0.4" + + +[[example]] +name = "client" + +[[example]] +name = "server" + + +[dev-dependencies] +tokio = { version = "1.35", features = ["rt-multi-thread"] } diff --git a/README.md b/README.md index e238fd0..677b957 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,9 @@ [![Build And Test](https://github.com/oyyd/tokio_smux/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/oyyd/tokio_smux/actions) [![crates.io](https://img.shields.io/crates/v/tokio_smux.svg)](https://crates.io/crates/tokio_smux) -tokio_smux is an implementation of [smux](https://github.com/xtaci/smux/) in Rust, which is a stream multiplexing library in Golang. +Tokio_smux is an implementation of [smux](https://github.com/xtaci/smux/) in Rust, which is a stream multiplexing library in Golang. -tokio_smux is originally written to work with tokio [TcpStream](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html) and [KcpStream](https://docs.rs/tokio_kcp/latest/tokio_kcp/struct.KcpStream.html). It can also be used with any streams implement tokio `AsyncRead` and `AsyncWrite`. - -## Smux Protocol Implementation Status - -Smux protocl version 2 is not supported yet. +Tokio_smux can work with tokio [TcpStream](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html) and [KcpStream](https://docs.rs/tokio_kcp/latest/tokio_kcp/struct.KcpStream.html). It can also be used with any streams that implement tokio's `AsyncRead` and `AsyncWrite` traits. ## Docs @@ -16,54 +12,17 @@ Smux protocl version 2 is not supported yet. ## Usage Example -Client side: -```rust -use anyhow::Result; -use tokio::net::{TcpListener, TcpStream}; -use tokio_smux::{Session, SmuxConfig, Stream}; - -async fn client() -> Result<()> { - let client = TcpStream::connect("127.0.0.1:3100").await?; - - let mut session = Session::new_client(client, SmuxConfig::default())?; +See [examples](./examples) - let stream = session.open_stream().await?; - - let data: Vec = vec![0; 65535]; - stream.send_message(data).await?; - - let msg = stream.recv_message().await?; -} -``` - -Server side: -```rust -use tokio::net::{TcpListener, TcpStream}; -use tokio_smux::{Session, SmuxConfig, Stream}; +## Smux Protocol Implementation Status -async fn server() -> Result<()> { - let listener = TcpListener::bind("0.0.0.0:3100").await?; - loop { - let (client, _) = listener.accept().await?; - let session = Session::new_server(client, SmuxConfig::default())?; +The smux protocl version 2 is not yet supported. - tokio::spawn(async move { - loop { - let mut stream = session.accept_stream().await.unwrap(); - println!("[server] accept stream {}", stream.sid()); +## Why doesn't `Stream` of tokio_smux impl `AsyncRead` and `AysncWrite` itself? - tokio::spawn(async move { - let data = stream.recv_message().await.unwrap(); - if data.is_none() { - println!("[server] stream fin {}", stream.sid()); - return - } +Becuase the smux protocol uses frames, means all user data transfered is wrapped in frames of fixed lengths. Similar to the websocket protocol. - println!("[serveri] receive client data len {}", data.unwrap().len()) - }); - } - }); - } -} +Using frames has its benifits: it maintains message boundaries. +For example, you can send many one byte data messages to the remote, and receive them one by one in the remote. -``` +It's still feasible to wrap the current APIs of `Stream` to provide `AsyncRead` and `AsyncWrite`. However, this approach introduces additional overhead and loses message boundaries. diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 0000000..3adfd82 --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,23 @@ +use tokio::net::TcpStream; +use tokio_smux::{Session, SmuxConfig}; + +#[tokio::main] +async fn main() { + let client = TcpStream::connect("127.0.0.1:3100").await.unwrap(); + + let mut session = Session::client(client, SmuxConfig::default()).unwrap(); + + let mut stream = session.open_stream().await.unwrap(); + + let data: Vec = vec![0; 65535]; + stream.send_message(data).await.unwrap(); + + let msg = stream.recv_message().await.unwrap(); + + if msg.is_none() { + println!("stream fin"); + return; + } + + println!("receive {} bytes", msg.unwrap().len()) +} diff --git a/examples/server.rs b/examples/server.rs new file mode 100644 index 0000000..6028214 --- /dev/null +++ b/examples/server.rs @@ -0,0 +1,28 @@ +use tokio::net::TcpListener; +use tokio_smux::{Session, SmuxConfig}; + +#[tokio::main] +async fn main() { + let listener = TcpListener::bind("0.0.0.0:3100").await.unwrap(); + loop { + let (client, _) = listener.accept().await.unwrap(); + let mut session = Session::server(client, SmuxConfig::default()).unwrap(); + + tokio::spawn(async move { + loop { + let mut stream = session.accept_stream().await.unwrap(); + println!("[server] accept stream {}", stream.sid()); + + tokio::spawn(async move { + let data = stream.recv_message().await.unwrap(); + if data.is_none() { + println!("[server] stream fin {}", stream.sid()); + return; + } + + println!("[serveri] receive client data len {}", data.unwrap().len()) + }); + } + }); + } +} diff --git a/src/config.rs b/src/config.rs index f5ca795..4ff0f8f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,26 +2,27 @@ use crate::error::{Result, TokioSmuxError}; use core::time; use std::time::Duration; +/// `Session` config. pub struct SmuxConfig { - // SMUX Protocol version, support 1. + /// SMUX Protocol version, support 1. pub version: u8, - // Disable keepalive. + /// Disable keepalive. pub keep_alive_disable: bool, - // keep_alive_interval is how often to send a NOP command to the remote. + /// `keep_alive_interval` is how often to send a NOP command to the remote. pub keep_alive_interval: Duration, - // **NOTE:** Not yet supported. - // KeepAliveTimeout is how long the session - // will be closed if no data has arrived. + /// **NOTE:** Not yet supported. + /// `keep_alive_timeout` is how long the session + /// will be closed if no data has arrived. pub keep_alive_timeout: Duration, - // Max number of pending writing frames in queue. - // More writing frames operations will be blocked. Default: 4096. + /// Max number of pending writing frames in queue. + /// More writing frames operations will be blocked. Default: 4096. pub writing_frame_channel_capacity: usize, - // Max number of pending reading frames in queue for each stream. - // More reading frames will be blocked until the frames in queue get consumed. - // Default: 1024. + /// Max number of pending reading frames in queue for each stream. + /// More reading frames will be blocked until the frames in queue get consumed. + /// Default: 1024. pub stream_reading_frame_channel_capacity: usize, } diff --git a/src/frame.rs b/src/frame.rs index eeb1add..f8c1da8 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -5,18 +5,19 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; pub type Sid = u32; +/// Frame commands of smux protocal. #[derive(Clone, Copy)] pub enum Cmd { - // stream open + /// Stream open. Sync, - // stream close, a.k.a EOF mark + /// Stream close, a.k.a EOF mark. Fin, - // data push + /// Data push. Psh, - // no operation + /// No operation. Nop, - // protocol version 2 extra commands - // notify bytes consumed by remote peer-end + /// Protocol version 2 extra commands. + /// Notify bytes consumed by remote peer-end. Udp, } @@ -140,6 +141,7 @@ impl Frame { })) } + #[allow(dead_code)] pub fn from_buf_with_data(data: &[u8]) -> Result> { let frame = Frame::from_buf(data); if frame.is_err() { diff --git a/src/lib.rs b/src/lib.rs index fed96c5..0ac2b3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,3 @@ -//! - mod config; mod error; mod frame; @@ -8,8 +6,8 @@ mod session; mod session_inner; mod stream; +pub use frame::Cmd; pub use config::SmuxConfig; pub use error::TokioSmuxError; -pub use frame::{Cmd, Frame}; -pub use session::Session; pub use stream::Stream; +pub use session::Session; diff --git a/src/read_frame_grouper.rs b/src/read_frame_grouper.rs index 54dffcc..ad14d7e 100644 --- a/src/read_frame_grouper.rs +++ b/src/read_frame_grouper.rs @@ -83,8 +83,9 @@ impl ReadFrameGrouper { #[cfg(test)] mod test { + use crate::frame::{Cmd, Frame}; + use crate::read_frame_grouper::ReadFrameGrouper; use crate::session_inner::ReadRequest; - use crate::{read_frame_grouper::ReadFrameGrouper, Cmd, Frame}; use dashmap::DashMap; use std::sync::Arc; use tokio::sync::mpsc; diff --git a/src/session.rs b/src/session.rs index 45ffa7d..d94445c 100644 --- a/src/session.rs +++ b/src/session.rs @@ -12,6 +12,7 @@ use tokio::sync::{mpsc, oneshot, Mutex}; const MAX_READ_REQ: usize = 4096; const MAX_IN_QUEUE_SYNC_FRAMES: usize = 4096; +/// Session is used to manage the underlying stream and provide multiplexing abilites. pub struct Session { config: SmuxConfig, @@ -48,14 +49,14 @@ impl Drop for Session { } impl Session { - pub fn new_client( + pub fn client( conn: T, config: SmuxConfig, ) -> Result { return Session::new(conn, config, true); } - pub fn new_server( + pub fn server( conn: T, config: SmuxConfig, ) -> Result { @@ -131,10 +132,7 @@ impl Session { Ok(session) } - pub fn inner_keys(&self) -> i32 { - (self.sid_close_tx_map.len() + self.sid_tx_map.len() + self.sid_rx_map.len()) as i32 - } - + /// Get the underlying stream error, e.g. tcp connection failures. pub async fn get_inner_err(&mut self) -> Option { let inner_err = self.inner_err.lock().await; if inner_err.is_some() { @@ -152,6 +150,7 @@ impl Session { return Err(err.unwrap()); } + /// Create a new `Stream` by sending a sync frame to the remote. pub async fn open_stream(&mut self) -> Result { // Check if stream id overflows. if self.go_away { @@ -197,6 +196,7 @@ impl Session { Ok(stream.unwrap()) } + /// Create a new `Stream` by receiving a sync frame from the remote. pub async fn accept_stream(&mut self) -> Result { // Check inner error. self.inner_ok().await?; diff --git a/src/session_inner.rs b/src/session_inner.rs index 9cbf186..9e935af 100644 --- a/src/session_inner.rs +++ b/src/session_inner.rs @@ -3,7 +3,7 @@ use std::future; use crate::error::Result; use crate::frame::HEADER_SIZE; -use crate::{Cmd, Frame}; +use crate::frame::{Cmd, Frame}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::{mpsc, oneshot}; use tokio::time; @@ -212,10 +212,10 @@ impl SessionInner { #[cfg(test)] mod test { use crate::frame::HEADER_SIZE; + use crate::frame::{Cmd, Frame}; use crate::session::test::MockAsyncStream; use crate::session_inner::SessionInner; use crate::session_inner::WriteRequest; - use crate::{Cmd, Frame}; use tokio::sync::mpsc; use tokio::sync::oneshot; diff --git a/src/stream.rs b/src/stream.rs index 536de08..3ea5c7c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -3,10 +3,9 @@ use crate::frame::{Cmd, Frame, Sid}; use crate::session_inner::WriteRequest; use tokio::sync::{mpsc, oneshot}; -// Three possible closing scenarioes: -// 1. close by remote fin message (should not send fin to remote) -// 2. close by session (should send fin to remote) -// 3. close by stream (should send fin to remote) +/// Use `Stream` to read data or write data from the remote. +/// +/// `Stream` is created by calling `Session::open_stream()` or `Session::accept_stream()`. pub struct Stream { sid: Sid, @@ -21,6 +20,10 @@ pub struct Stream { closed: bool, } +// Three possible closing scenarioes: +// 1. close by remote fin message (should not send fin to remote) +// 2. close by session (should send fin to remote) +// 3. close by stream (should send fin to remote) impl Drop for Stream { fn drop(&mut self) { let sid = self.sid; @@ -69,6 +72,7 @@ impl Stream { } } + // Get stream id. pub fn sid(&self) -> Sid { self.sid } @@ -77,6 +81,10 @@ impl Stream { self.drop_tx = drop_tx; } + /// Send a data frame to the remote. + /// + /// Since the max data size of a frame is `u16::MAX`, the `data.len()` should be smaller or equal to + /// `u16::MAX`, or an error will be returned. pub async fn send_message(&mut self, data: Vec) -> Result<()> { if data.len() > (u16::MAX as usize) { return Err(TokioSmuxError::StreamWriteTooLargeData); @@ -107,6 +115,9 @@ impl Stream { Ok(()) } + /// Receive a data frame from the remote. + /// + /// Returning `Ok(None)` means the stream has received fin and gets closed by the remote. pub async fn recv_message(&mut self) -> Result>> { // We don't check close_rx here because we still allow the stream to consume // rest data.