diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index ec1679f87d4..877433c6e16 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -2,6 +2,10 @@ - Update dependencies. +- Support stream IDs of up to 60 bit length. See [PR 2094] for details. + +[PR 2094]: https://github.com/libp2p/rust-libp2p/pull/2094 + # 0.28.0 [2021-03-17] - Update dependencies. diff --git a/muxers/mplex/src/codec.rs b/muxers/mplex/src/codec.rs index a1112223bfb..23ba1fcb632 100644 --- a/muxers/mplex/src/codec.rs +++ b/muxers/mplex/src/codec.rs @@ -47,7 +47,7 @@ pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024; /// > the corresponding local ID has the role `Endpoint::Listener`. #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub struct LocalStreamId { - num: u32, + num: u64, role: Endpoint, } @@ -63,7 +63,7 @@ impl fmt::Display for LocalStreamId { impl Hash for LocalStreamId { #![allow(clippy::derive_hash_xor_eq)] fn hash(&self, state: &mut H) { - state.write_u32(self.num); + state.write_u64(self.num); } } @@ -76,17 +76,17 @@ impl nohash_hasher::IsEnabled for LocalStreamId {} /// [`RemoteStreamId::into_local()`]. #[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] pub struct RemoteStreamId { - num: u32, + num: u64, role: Endpoint, } impl LocalStreamId { - pub fn dialer(num: u32) -> Self { + pub fn dialer(num: u64) -> Self { Self { num, role: Endpoint::Dialer } } #[cfg(test)] - pub fn listener(num: u32) -> Self { + pub fn listener(num: u64) -> Self { Self { num, role: Endpoint::Listener } } @@ -107,11 +107,11 @@ impl LocalStreamId { } impl RemoteStreamId { - fn dialer(num: u32) -> Self { + fn dialer(num: u64) -> Self { Self { num, role: Endpoint::Dialer } } - fn listener(num: u32) -> Self { + fn listener(num: u64) -> Self { Self { num, role: Endpoint::Listener } } @@ -146,15 +146,15 @@ impl Frame { } pub struct Codec { - varint_decoder: codec::Uvi, + varint_decoder: codec::Uvi, decoder_state: CodecDecodeState, } #[derive(Debug, Clone)] enum CodecDecodeState { Begin, - HasHeader(u32), - HasHeaderAndLen(u32, usize), + HasHeader(u64), + HasHeaderAndLen(u64, usize), Poisoned, } @@ -210,7 +210,7 @@ impl Decoder for Codec { } let buf = src.split_to(len); - let num = (header >> 3) as u32; + let num = (header >> 3) as u64; let out = match header & 7 { 0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) }, 1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() }, @@ -305,4 +305,29 @@ mod tests { let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data }; assert!(enc.encode(ok_msg, &mut out).is_ok()); } + + #[test] + fn test_60bit_stream_id() { + // Create new codec object for encoding and decoding our frame. + let mut codec = Codec::new(); + // Create a u64 stream ID. + let id: u64 = u32::MAX as u64 + 1 ; + let stream_id = LocalStreamId { num: id, role: Endpoint::Dialer }; + + // Open a new frame with that stream ID. + let original_frame = Frame::Open { stream_id }; + + // Encode that frame. + let mut enc_frame = BytesMut::new(); + codec.encode(original_frame, &mut enc_frame) + .expect("Encoding to succeed."); + + // Decode encoded frame and extract stream ID. + let dec_string_id = codec.decode(&mut enc_frame) + .expect("Decoding to succeed.") + .map(|f| f.remote_id()) + .unwrap(); + + assert_eq!(dec_string_id.num, stream_id.num); + } } diff --git a/muxers/mplex/src/io.rs b/muxers/mplex/src/io.rs index d750b6bd83f..e4e49935b8d 100644 --- a/muxers/mplex/src/io.rs +++ b/muxers/mplex/src/io.rs @@ -1093,7 +1093,7 @@ mod tests { // Open the maximum number of inbound streams. for i in 0 .. cfg.max_substreams { - let stream_id = LocalStreamId::dialer(i as u32); + let stream_id = LocalStreamId::dialer(i as u64); codec.encode(Frame::Open { stream_id }, &mut r_buf).unwrap(); } @@ -1115,7 +1115,7 @@ mod tests { Poll::Pending => panic!("Expected new inbound stream."), Poll::Ready(Err(e)) => panic!("{:?}", e), Poll::Ready(Ok(id)) => { - assert_eq!(id, LocalStreamId::listener(i as u32)); + assert_eq!(id, LocalStreamId::listener(i as u64)); } }; } @@ -1162,7 +1162,7 @@ mod tests { MaxBufferBehaviour::Block => { assert!(m.poll_next_stream(cx).is_pending()); for i in 1 .. cfg.max_substreams { - let id = LocalStreamId::listener(i as u32); + let id = LocalStreamId::listener(i as u64); assert!(m.poll_read_stream(cx, id).is_pending()); } }