Skip to content

Commit

Permalink
reformatting code
Browse files Browse the repository at this point in the history
  • Loading branch information
Tejas Sanap committed Jun 28, 2021
1 parent ead5d97 commit a016222
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 144 deletions.
4 changes: 4 additions & 0 deletions muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.39.0 [unreleased]

- Increased the lenght of Stream ID to 60 bits.

# 0.29.0 [unreleased]

- Update dependencies.
Expand Down
214 changes: 70 additions & 144 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use asynchronous_codec::{Decoder, Encoder};
use bytes::{BufMut, Bytes, BytesMut};
use asynchronous_codec::{Decoder, Encoder};
use libp2p_core::Endpoint;
use std::{
fmt,
hash::{Hash, Hasher},
io, mem,
};
use std::{fmt, hash::{Hash, Hasher}, io, mem};
use unsigned_varint::{codec, encode};

// Maximum size for a packet: 1MB as per the spec.
Expand Down Expand Up @@ -86,27 +82,18 @@ pub struct RemoteStreamId {

impl LocalStreamId {
pub fn dialer(num: u64) -> Self {
Self {
num,
role: Endpoint::Dialer,
}
Self { num, role: Endpoint::Dialer }
}

#[cfg(test)]
pub fn listener(num: u64) -> Self {
Self {
num,
role: Endpoint::Listener,
}
Self { num, role: Endpoint::Listener }
}

pub fn next(self) -> Self {
Self {
num: self
.num
.checked_add(1)
.expect("Mplex substream ID overflowed"),
..self
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
.. self
}
}

Expand All @@ -121,17 +108,11 @@ impl LocalStreamId {

impl RemoteStreamId {
fn dialer(num: u64) -> Self {
Self {
num,
role: Endpoint::Dialer,
}
Self { num, role: Endpoint::Dialer }
}

fn listener(num: u64) -> Self {
Self {
num,
role: Endpoint::Listener,
}
Self { num, role: Endpoint::Listener }
}

/// Converts this `RemoteStreamId` into the corresponding `LocalStreamId`
Expand Down Expand Up @@ -193,28 +174,31 @@ impl Decoder for Codec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.decoder_state, CodecDecodeState::Poisoned) {
CodecDecodeState::Begin => match self.varint_decoder.decode(src)? {
Some(header) => {
self.decoder_state = CodecDecodeState::HasHeader(header);
}
None => {
self.decoder_state = CodecDecodeState::Begin;
return Ok(None);
CodecDecodeState::Begin => {
match self.varint_decoder.decode(src)? {
Some(header) => {
self.decoder_state = CodecDecodeState::HasHeader(header);
},
None => {
self.decoder_state = CodecDecodeState::Begin;
return Ok(None);
},
}
},
CodecDecodeState::HasHeader(header) => match self.varint_decoder.decode(src)? {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}

self.decoder_state =
CodecDecodeState::HasHeaderAndLen(header, len as usize);
}
None => {
self.decoder_state = CodecDecodeState::HasHeader(header);
return Ok(None);
CodecDecodeState::HasHeader(header) => {
match self.varint_decoder.decode(src)? {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}

self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize);
},
None => {
self.decoder_state = CodecDecodeState::HasHeader(header);
return Ok(None);
},
}
},
CodecDecodeState::HasHeaderAndLen(header, len) => {
Expand All @@ -228,44 +212,25 @@ impl Decoder for Codec {
let buf = src.split_to(len);
let num = (header >> 3) as u64;
let out = match header & 7 {
0 => Frame::Open {
stream_id: RemoteStreamId::dialer(num),
},
1 => Frame::Data {
stream_id: RemoteStreamId::listener(num),
data: buf.freeze(),
},
2 => Frame::Data {
stream_id: RemoteStreamId::dialer(num),
data: buf.freeze(),
},
3 => Frame::Close {
stream_id: RemoteStreamId::listener(num),
},
4 => Frame::Close {
stream_id: RemoteStreamId::dialer(num),
},
5 => Frame::Reset {
stream_id: RemoteStreamId::listener(num),
},
6 => Frame::Reset {
stream_id: RemoteStreamId::dialer(num),
},
0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) },
1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() },
2 => Frame::Data { stream_id: RemoteStreamId::dialer(num), data: buf.freeze() },
3 => Frame::Close { stream_id: RemoteStreamId::listener(num) },
4 => Frame::Close { stream_id: RemoteStreamId::dialer(num) },
5 => Frame::Reset { stream_id: RemoteStreamId::listener(num) },
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}
},
};

self.decoder_state = CodecDecodeState::Begin;
return Ok(Some(out));
}
},

CodecDecodeState::Poisoned => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Mplex codec poisoned",
));
return Err(io::Error::new(io::ErrorKind::InvalidData, "Mplex codec poisoned"));
}
}
}
Expand All @@ -278,51 +243,27 @@ impl Encoder for Codec {

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Frame::Open { stream_id } => (u64::from(stream_id.num) << 3, Bytes::new()),
Frame::Data {
stream_id:
LocalStreamId {
num,
role: Endpoint::Listener,
},
data,
} => (u64::from(num) << 3 | 1, data),
Frame::Data {
stream_id:
LocalStreamId {
num,
role: Endpoint::Dialer,
},
data,
} => (u64::from(num) << 3 | 2, data),
Frame::Close {
stream_id:
LocalStreamId {
num,
role: Endpoint::Listener,
},
} => (u64::from(num) << 3 | 3, Bytes::new()),
Frame::Close {
stream_id:
LocalStreamId {
num,
role: Endpoint::Dialer,
},
} => (u64::from(num) << 3 | 4, Bytes::new()),
Frame::Reset {
stream_id:
LocalStreamId {
num,
role: Endpoint::Listener,
},
} => (u64::from(num) << 3 | 5, Bytes::new()),
Frame::Reset {
stream_id:
LocalStreamId {
num,
role: Endpoint::Dialer,
},
} => (u64::from(num) << 3 | 6, Bytes::new()),
Frame::Open { stream_id } => {
(u64::from(stream_id.num) << 3, Bytes::new())
},
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Listener }, data } => {
(u64::from(num) << 3 | 1, data)
},
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Dialer }, data } => {
(u64::from(num) << 3 | 2, data)
},
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 3, Bytes::new())
},
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 4, Bytes::new())
},
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 5, Bytes::new())
},
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 6, Bytes::new())
},
};

let mut header_buf = encode::u64_buffer();
Expand All @@ -333,10 +274,7 @@ impl Encoder for Codec {
let data_len_bytes = encode::usize(data_len, &mut data_buf);

if data_len > MAX_FRAME_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"data size exceed maximum",
));
return Err(io::Error::new(io::ErrorKind::InvalidData, "data size exceed maximum"));
}

dst.reserve(header_bytes.len() + data_len_bytes.len() + data_len);
Expand All @@ -356,45 +294,33 @@ mod tests {
let mut enc = Codec::new();
let role = Endpoint::Dialer;
let data = Bytes::from(&[123u8; MAX_FRAME_SIZE + 1][..]);
let bad_msg = Frame::Data {
stream_id: LocalStreamId { num: 123, role },
data,
};
let bad_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
let mut out = BytesMut::new();
match enc.encode(bad_msg, &mut out) {
Err(e) => assert_eq!(e.to_string(), "data size exceed maximum"),
_ => panic!("Can't send a message bigger than MAX_FRAME_SIZE"),
_ => panic!("Can't send a message bigger than MAX_FRAME_SIZE")
}

let data = Bytes::from(&[123u8; MAX_FRAME_SIZE][..]);
let ok_msg = Frame::Data {
stream_id: LocalStreamId { num: 123, role },
data,
};
let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
assert!(enc.encode(ok_msg, &mut out).is_ok());
}

#[test]
fn test_60bit_stream_id() {
// Create new codec object for encoding and decoding our frame.
let mut codec = Codec::new();

// Create a u64 stream ID.
let id: u64 = u32::MAX as u64 + 1;
let stream_id = LocalStreamId {
num: id,
role: Endpoint::Dialer,
};
let id: u64 = u32::MAX as u64 + 1 ;
let stream_id = LocalStreamId { num: id, role: Endpoint::Dialer };

// Open a new frame with that stream ID.
let original_frame = Frame::Open { stream_id };

// Encode that frame.
let mut enc_frame = BytesMut::new();
match codec.encode(original_frame, &mut enc_frame) {
Err(e) => panic!("{}", e.to_string()),
_ => {}
};
codec.encode(original_frame, &mut enc_frame)
.expect("Encoding to succeed.");

// Decode encoded frame and extract stream ID.
let dec_string_id = codec.decode(&mut enc_frame)
Expand Down

0 comments on commit a016222

Please sign in to comment.