Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

muxers/mplex: Allow up to 60 bit long stream IDs #2094

Merged
merged 8 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

- Update dependencies.

- Support stream IDs of up to 60 bit length. See [PR 2094] for details.

[PR 2094]: https://github.com/libp2p/rust-libp2p/pull/2094

# 0.28.0 [2021-03-17]

- Update dependencies.
Expand Down
47 changes: 36 additions & 11 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024;
/// > the corresponding local ID has the role `Endpoint::Listener`.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub struct LocalStreamId {
num: u32,
num: u64,
role: Endpoint,
}

Expand All @@ -63,7 +63,7 @@ impl fmt::Display for LocalStreamId {
impl Hash for LocalStreamId {
#![allow(clippy::derive_hash_xor_eq)]
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_u32(self.num);
state.write_u64(self.num);
}
}

Expand All @@ -76,17 +76,17 @@ impl nohash_hasher::IsEnabled for LocalStreamId {}
/// [`RemoteStreamId::into_local()`].
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct RemoteStreamId {
num: u32,
num: u64,
role: Endpoint,
}

impl LocalStreamId {
pub fn dialer(num: u32) -> Self {
pub fn dialer(num: u64) -> Self {
Self { num, role: Endpoint::Dialer }
}

#[cfg(test)]
pub fn listener(num: u32) -> Self {
pub fn listener(num: u64) -> Self {
Self { num, role: Endpoint::Listener }
}

Expand All @@ -107,11 +107,11 @@ impl LocalStreamId {
}

impl RemoteStreamId {
fn dialer(num: u32) -> Self {
fn dialer(num: u64) -> Self {
Self { num, role: Endpoint::Dialer }
}

fn listener(num: u32) -> Self {
fn listener(num: u64) -> Self {
Self { num, role: Endpoint::Listener }
}

Expand Down Expand Up @@ -146,15 +146,15 @@ impl Frame<RemoteStreamId> {
}

pub struct Codec {
varint_decoder: codec::Uvi<u32>,
varint_decoder: codec::Uvi<u64>,
decoder_state: CodecDecodeState,
}

#[derive(Debug, Clone)]
enum CodecDecodeState {
Begin,
HasHeader(u32),
HasHeaderAndLen(u32, usize),
HasHeader(u64),
HasHeaderAndLen(u64, usize),
Poisoned,
}

Expand Down Expand Up @@ -210,7 +210,7 @@ impl Decoder for Codec {
}

let buf = src.split_to(len);
let num = (header >> 3) as u32;
let num = (header >> 3) as u64;
let out = match header & 7 {
0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) },
1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() },
Expand Down Expand Up @@ -305,4 +305,29 @@ mod tests {
let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
assert!(enc.encode(ok_msg, &mut out).is_ok());
}

#[test]
fn test_60bit_stream_id() {
// Create new codec object for encoding and decoding our frame.
let mut codec = Codec::new();
// Create a u64 stream ID.
let id: u64 = u32::MAX as u64 + 1 ;
let stream_id = LocalStreamId { num: id, role: Endpoint::Dialer };

// Open a new frame with that stream ID.
let original_frame = Frame::Open { stream_id };

// Encode that frame.
let mut enc_frame = BytesMut::new();
codec.encode(original_frame, &mut enc_frame)
.expect("Encoding to succeed.");

// Decode encoded frame and extract stream ID.
let dec_string_id = codec.decode(&mut enc_frame)
.expect("Decoding to succeed.")
.map(|f| f.remote_id())
.unwrap();

assert_eq!(dec_string_id.num, stream_id.num);
}
}
6 changes: 3 additions & 3 deletions muxers/mplex/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ mod tests {

// Open the maximum number of inbound streams.
for i in 0 .. cfg.max_substreams {
let stream_id = LocalStreamId::dialer(i as u32);
let stream_id = LocalStreamId::dialer(i as u64);
codec.encode(Frame::Open { stream_id }, &mut r_buf).unwrap();
}

Expand All @@ -1115,7 +1115,7 @@ mod tests {
Poll::Pending => panic!("Expected new inbound stream."),
Poll::Ready(Err(e)) => panic!("{:?}", e),
Poll::Ready(Ok(id)) => {
assert_eq!(id, LocalStreamId::listener(i as u32));
assert_eq!(id, LocalStreamId::listener(i as u64));
}
};
}
Expand Down Expand Up @@ -1162,7 +1162,7 @@ mod tests {
MaxBufferBehaviour::Block => {
assert!(m.poll_next_stream(cx).is_pending());
for i in 1 .. cfg.max_substreams {
let id = LocalStreamId::listener(i as u32);
let id = LocalStreamId::listener(i as u64);
assert!(m.poll_read_stream(cx, id).is_pending());
}
}
Expand Down