Skip to content

Commit

Permalink
feat: update docs and example
Browse files Browse the repository at this point in the history
  • Loading branch information
oyyd committed Jan 13, 2024
1 parent b9623c0 commit 97a77fa
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 85 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,14 @@ tokio = { version = "1.35", features = [
] }
dashmap = "5.5"
log = "0.4"


[[example]]
name = "client"

[[example]]
name = "server"


[dev-dependencies]
tokio = { version = "1.35", features = ["rt-multi-thread"] }
61 changes: 10 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,68 +2,27 @@

[![Build And Test](https://github.com/oyyd/tokio_smux/actions/workflows/build_and_test.yml/badge.svg)](https://github.com/oyyd/tokio_smux/actions) [![crates.io](https://img.shields.io/crates/v/tokio_smux.svg)](https://crates.io/crates/tokio_smux)

tokio_smux is an implementation of [smux](https://github.com/xtaci/smux/) in Rust, which is a stream multiplexing library in Golang.
Tokio_smux is an implementation of [smux](https://github.com/xtaci/smux/) in Rust, which is a stream multiplexing library in Golang.

tokio_smux is originally written to work with tokio [TcpStream](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html) and [KcpStream](https://docs.rs/tokio_kcp/latest/tokio_kcp/struct.KcpStream.html). It can also be used with any streams implement tokio `AsyncRead` and `AsyncWrite`.

## Smux Protocol Implementation Status

Smux protocl version 2 is not supported yet.
Tokio_smux can work with tokio [TcpStream](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html) and [KcpStream](https://docs.rs/tokio_kcp/latest/tokio_kcp/struct.KcpStream.html). It can also be used with any streams that implement tokio's `AsyncRead` and `AsyncWrite` traits.

## Docs

[https://docs.rs/tokio_smux](https://docs.rs/tokio_smux)

## Usage Example

Client side:
```rust
use anyhow::Result;
use tokio::net::{TcpListener, TcpStream};
use tokio_smux::{Session, SmuxConfig, Stream};

async fn client() -> Result<()> {
let client = TcpStream::connect("127.0.0.1:3100").await?;

let mut session = Session::new_client(client, SmuxConfig::default())?;
See [examples](./examples)

let stream = session.open_stream().await?;

let data: Vec<u8> = vec![0; 65535];
stream.send_message(data).await?;

let msg = stream.recv_message().await?;
}
```

Server side:
```rust
use tokio::net::{TcpListener, TcpStream};
use tokio_smux::{Session, SmuxConfig, Stream};
## Smux Protocol Implementation Status

async fn server() -> Result<()> {
let listener = TcpListener::bind("0.0.0.0:3100").await?;
loop {
let (client, _) = listener.accept().await?;
let session = Session::new_server(client, SmuxConfig::default())?;
The smux protocl version 2 is not yet supported.

tokio::spawn(async move {
loop {
let mut stream = session.accept_stream().await.unwrap();
println!("[server] accept stream {}", stream.sid());
## Why doesn't `Stream` of tokio_smux impl `AsyncRead` and `AysncWrite` itself?

tokio::spawn(async move {
let data = stream.recv_message().await.unwrap();
if data.is_none() {
println!("[server] stream fin {}", stream.sid());
return
}
Becuase the smux protocol uses frames, means all user data transfered is wrapped in frames of fixed lengths. Similar to the websocket protocol.

println!("[serveri] receive client data len {}", data.unwrap().len())
});
}
});
}
}
Using frames has its benifits: it maintains message boundaries.
For example, you can send many one byte data messages to the remote, and receive them one by one in the remote.

```
It's still feasible to wrap the current APIs of `Stream` to provide `AsyncRead` and `AsyncWrite`. However, this approach introduces additional overhead and loses message boundaries.
23 changes: 23 additions & 0 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use tokio::net::TcpStream;
use tokio_smux::{Session, SmuxConfig};

#[tokio::main]
async fn main() {
let client = TcpStream::connect("127.0.0.1:3100").await.unwrap();

let mut session = Session::client(client, SmuxConfig::default()).unwrap();

let mut stream = session.open_stream().await.unwrap();

let data: Vec<u8> = vec![0; 65535];
stream.send_message(data).await.unwrap();

let msg = stream.recv_message().await.unwrap();

if msg.is_none() {
println!("stream fin");
return;
}

println!("receive {} bytes", msg.unwrap().len())
}
28 changes: 28 additions & 0 deletions examples/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use tokio::net::TcpListener;
use tokio_smux::{Session, SmuxConfig};

#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:3100").await.unwrap();
loop {
let (client, _) = listener.accept().await.unwrap();
let mut session = Session::server(client, SmuxConfig::default()).unwrap();

tokio::spawn(async move {
loop {
let mut stream = session.accept_stream().await.unwrap();
println!("[server] accept stream {}", stream.sid());

tokio::spawn(async move {
let data = stream.recv_message().await.unwrap();
if data.is_none() {
println!("[server] stream fin {}", stream.sid());
return;
}

println!("[serveri] receive client data len {}", data.unwrap().len())
});
}
});
}
}
23 changes: 12 additions & 11 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,27 @@ use crate::error::{Result, TokioSmuxError};
use core::time;
use std::time::Duration;

/// `Session` config.
pub struct SmuxConfig {
// SMUX Protocol version, support 1.
/// SMUX Protocol version, support 1.
pub version: u8,
// Disable keepalive.
/// Disable keepalive.
pub keep_alive_disable: bool,
// keep_alive_interval is how often to send a NOP command to the remote.
/// `keep_alive_interval` is how often to send a NOP command to the remote.
pub keep_alive_interval: Duration,

// **NOTE:** Not yet supported.
// KeepAliveTimeout is how long the session
// will be closed if no data has arrived.
/// **NOTE:** Not yet supported.
/// `keep_alive_timeout` is how long the session
/// will be closed if no data has arrived.
pub keep_alive_timeout: Duration,

// Max number of pending writing frames in queue.
// More writing frames operations will be blocked. Default: 4096.
/// Max number of pending writing frames in queue.
/// More writing frames operations will be blocked. Default: 4096.
pub writing_frame_channel_capacity: usize,

// Max number of pending reading frames in queue for each stream.
// More reading frames will be blocked until the frames in queue get consumed.
// Default: 1024.
/// Max number of pending reading frames in queue for each stream.
/// More reading frames will be blocked until the frames in queue get consumed.
/// Default: 1024.
pub stream_reading_frame_channel_capacity: usize,
}

Expand Down
14 changes: 8 additions & 6 deletions src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};

pub type Sid = u32;

/// Frame commands of smux protocal.
#[derive(Clone, Copy)]
pub enum Cmd {
// stream open
/// Stream open.
Sync,
// stream close, a.k.a EOF mark
/// Stream close, a.k.a EOF mark.
Fin,
// data push
/// Data push.
Psh,
// no operation
/// No operation.
Nop,
// protocol version 2 extra commands
// notify bytes consumed by remote peer-end
/// Protocol version 2 extra commands.
/// Notify bytes consumed by remote peer-end.
Udp,
}

Expand Down Expand Up @@ -140,6 +141,7 @@ impl Frame {
}))
}

#[allow(dead_code)]
pub fn from_buf_with_data(data: &[u8]) -> Result<Option<Self>> {
let frame = Frame::from_buf(data);
if frame.is_err() {
Expand Down
6 changes: 2 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//!
mod config;
mod error;
mod frame;
Expand All @@ -8,8 +6,8 @@ mod session;
mod session_inner;
mod stream;

pub use frame::Cmd;
pub use config::SmuxConfig;
pub use error::TokioSmuxError;
pub use frame::{Cmd, Frame};
pub use session::Session;
pub use stream::Stream;
pub use session::Session;
3 changes: 2 additions & 1 deletion src/read_frame_grouper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ impl ReadFrameGrouper {

#[cfg(test)]
mod test {
use crate::frame::{Cmd, Frame};
use crate::read_frame_grouper::ReadFrameGrouper;
use crate::session_inner::ReadRequest;
use crate::{read_frame_grouper::ReadFrameGrouper, Cmd, Frame};
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
Expand Down
12 changes: 6 additions & 6 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::sync::{mpsc, oneshot, Mutex};
const MAX_READ_REQ: usize = 4096;
const MAX_IN_QUEUE_SYNC_FRAMES: usize = 4096;

/// Session is used to manage the underlying stream and provide multiplexing abilites.
pub struct Session {
config: SmuxConfig,

Expand Down Expand Up @@ -48,14 +49,14 @@ impl Drop for Session {
}

impl Session {
pub fn new_client<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
pub fn client<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
conn: T,
config: SmuxConfig,
) -> Result<Self> {
return Session::new(conn, config, true);
}

pub fn new_server<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
pub fn server<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
conn: T,
config: SmuxConfig,
) -> Result<Self> {
Expand Down Expand Up @@ -131,10 +132,7 @@ impl Session {
Ok(session)
}

pub fn inner_keys(&self) -> i32 {
(self.sid_close_tx_map.len() + self.sid_tx_map.len() + self.sid_rx_map.len()) as i32
}

/// Get the underlying stream error, e.g. tcp connection failures.
pub async fn get_inner_err(&mut self) -> Option<TokioSmuxError> {
let inner_err = self.inner_err.lock().await;
if inner_err.is_some() {
Expand All @@ -152,6 +150,7 @@ impl Session {
return Err(err.unwrap());
}

/// Create a new `Stream` by sending a sync frame to the remote.
pub async fn open_stream(&mut self) -> Result<Stream> {
// Check if stream id overflows.
if self.go_away {
Expand Down Expand Up @@ -197,6 +196,7 @@ impl Session {
Ok(stream.unwrap())
}

/// Create a new `Stream` by receiving a sync frame from the remote.
pub async fn accept_stream(&mut self) -> Result<Stream> {
// Check inner error.
self.inner_ok().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/session_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::future;

use crate::error::Result;
use crate::frame::HEADER_SIZE;
use crate::{Cmd, Frame};
use crate::frame::{Cmd, Frame};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};
use tokio::time;
Expand Down Expand Up @@ -212,10 +212,10 @@ impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> SessionInner<T> {
#[cfg(test)]
mod test {
use crate::frame::HEADER_SIZE;
use crate::frame::{Cmd, Frame};
use crate::session::test::MockAsyncStream;
use crate::session_inner::SessionInner;
use crate::session_inner::WriteRequest;
use crate::{Cmd, Frame};
use tokio::sync::mpsc;
use tokio::sync::oneshot;

Expand Down
Loading

0 comments on commit 97a77fa

Please sign in to comment.