Skip to content

Commit

Permalink
minor fixes in comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tejas Sanap committed Jun 28, 2021
1 parent 2b1b5d0 commit ead5d97
Showing 1 changed file with 150 additions and 80 deletions.
230 changes: 150 additions & 80 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::{BufMut, Bytes, BytesMut};
use asynchronous_codec::{Decoder, Encoder};
use bytes::{BufMut, Bytes, BytesMut};
use libp2p_core::Endpoint;
use std::{fmt, hash::{Hash, Hasher}, io, mem};
use std::{
fmt,
hash::{Hash, Hasher},
io, mem,
};
use unsigned_varint::{codec, encode};

// Maximum size for a packet: 1MB as per the spec.
Expand Down Expand Up @@ -82,18 +86,27 @@ pub struct RemoteStreamId {

impl LocalStreamId {
pub fn dialer(num: u64) -> Self {
Self { num, role: Endpoint::Dialer }
Self {
num,
role: Endpoint::Dialer,
}
}

#[cfg(test)]
pub fn listener(num: u64) -> Self {
Self { num, role: Endpoint::Listener }
Self {
num,
role: Endpoint::Listener,
}
}

pub fn next(self) -> Self {
Self {
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
.. self
num: self
.num
.checked_add(1)
.expect("Mplex substream ID overflowed"),
..self
}
}

Expand All @@ -108,11 +121,17 @@ impl LocalStreamId {

impl RemoteStreamId {
fn dialer(num: u64) -> Self {
Self { num, role: Endpoint::Dialer }
Self {
num,
role: Endpoint::Dialer,
}
}

fn listener(num: u64) -> Self {
Self { num, role: Endpoint::Listener }
Self {
num,
role: Endpoint::Listener,
}
}

/// Converts this `RemoteStreamId` into the corresponding `LocalStreamId`
Expand Down Expand Up @@ -174,31 +193,28 @@ impl Decoder for Codec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
match mem::replace(&mut self.decoder_state, CodecDecodeState::Poisoned) {
CodecDecodeState::Begin => {
match self.varint_decoder.decode(src)? {
Some(header) => {
self.decoder_state = CodecDecodeState::HasHeader(header);
},
None => {
self.decoder_state = CodecDecodeState::Begin;
return Ok(None);
},
CodecDecodeState::Begin => match self.varint_decoder.decode(src)? {
Some(header) => {
self.decoder_state = CodecDecodeState::HasHeader(header);
}
None => {
self.decoder_state = CodecDecodeState::Begin;
return Ok(None);
}
},
CodecDecodeState::HasHeader(header) => {
match self.varint_decoder.decode(src)? {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}

self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize);
},
None => {
self.decoder_state = CodecDecodeState::HasHeader(header);
return Ok(None);
},
CodecDecodeState::HasHeader(header) => match self.varint_decoder.decode(src)? {
Some(len) => {
if len as usize > MAX_FRAME_SIZE {
let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}

self.decoder_state =
CodecDecodeState::HasHeaderAndLen(header, len as usize);
}
None => {
self.decoder_state = CodecDecodeState::HasHeader(header);
return Ok(None);
}
},
CodecDecodeState::HasHeaderAndLen(header, len) => {
Expand All @@ -212,25 +228,44 @@ impl Decoder for Codec {
let buf = src.split_to(len);
let num = (header >> 3) as u64;
let out = match header & 7 {
0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) },
1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() },
2 => Frame::Data { stream_id: RemoteStreamId::dialer(num), data: buf.freeze() },
3 => Frame::Close { stream_id: RemoteStreamId::listener(num) },
4 => Frame::Close { stream_id: RemoteStreamId::dialer(num) },
5 => Frame::Reset { stream_id: RemoteStreamId::listener(num) },
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
0 => Frame::Open {
stream_id: RemoteStreamId::dialer(num),
},
1 => Frame::Data {
stream_id: RemoteStreamId::listener(num),
data: buf.freeze(),
},
2 => Frame::Data {
stream_id: RemoteStreamId::dialer(num),
data: buf.freeze(),
},
3 => Frame::Close {
stream_id: RemoteStreamId::listener(num),
},
4 => Frame::Close {
stream_id: RemoteStreamId::dialer(num),
},
5 => Frame::Reset {
stream_id: RemoteStreamId::listener(num),
},
6 => Frame::Reset {
stream_id: RemoteStreamId::dialer(num),
},
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
},
}
};

self.decoder_state = CodecDecodeState::Begin;
return Ok(Some(out));
},
}

CodecDecodeState::Poisoned => {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Mplex codec poisoned"));
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Mplex codec poisoned",
));
}
}
}
Expand All @@ -243,27 +278,51 @@ impl Encoder for Codec {

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Frame::Open { stream_id } => {
(u64::from(stream_id.num) << 3, Bytes::new())
},
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Listener }, data } => {
(u64::from(num) << 3 | 1, data)
},
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Dialer }, data } => {
(u64::from(num) << 3 | 2, data)
},
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 3, Bytes::new())
},
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 4, Bytes::new())
},
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 5, Bytes::new())
},
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 6, Bytes::new())
},
Frame::Open { stream_id } => (u64::from(stream_id.num) << 3, Bytes::new()),
Frame::Data {
stream_id:
LocalStreamId {
num,
role: Endpoint::Listener,
},
data,
} => (u64::from(num) << 3 | 1, data),
Frame::Data {
stream_id:
LocalStreamId {
num,
role: Endpoint::Dialer,
},
data,
} => (u64::from(num) << 3 | 2, data),
Frame::Close {
stream_id:
LocalStreamId {
num,
role: Endpoint::Listener,
},
} => (u64::from(num) << 3 | 3, Bytes::new()),
Frame::Close {
stream_id:
LocalStreamId {
num,
role: Endpoint::Dialer,
},
} => (u64::from(num) << 3 | 4, Bytes::new()),
Frame::Reset {
stream_id:
LocalStreamId {
num,
role: Endpoint::Listener,
},
} => (u64::from(num) << 3 | 5, Bytes::new()),
Frame::Reset {
stream_id:
LocalStreamId {
num,
role: Endpoint::Dialer,
},
} => (u64::from(num) << 3 | 6, Bytes::new()),
};

let mut header_buf = encode::u64_buffer();
Expand All @@ -274,7 +333,10 @@ impl Encoder for Codec {
let data_len_bytes = encode::usize(data_len, &mut data_buf);

if data_len > MAX_FRAME_SIZE {
return Err(io::Error::new(io::ErrorKind::InvalidData, "data size exceed maximum"));
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"data size exceed maximum",
));
}

dst.reserve(header_bytes.len() + data_len_bytes.len() + data_len);
Expand All @@ -294,44 +356,52 @@ mod tests {
let mut enc = Codec::new();
let role = Endpoint::Dialer;
let data = Bytes::from(&[123u8; MAX_FRAME_SIZE + 1][..]);
let bad_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
let bad_msg = Frame::Data {
stream_id: LocalStreamId { num: 123, role },
data,
};
let mut out = BytesMut::new();
match enc.encode(bad_msg, &mut out) {
Err(e) => assert_eq!(e.to_string(), "data size exceed maximum"),
_ => panic!("Can't send a message bigger than MAX_FRAME_SIZE")
_ => panic!("Can't send a message bigger than MAX_FRAME_SIZE"),
}

let data = Bytes::from(&[123u8; MAX_FRAME_SIZE][..]);
let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
let ok_msg = Frame::Data {
stream_id: LocalStreamId { num: 123, role },
data,
};
assert!(enc.encode(ok_msg, &mut out).is_ok());
}

#[test]
fn test_60bit_stream_id() {
// create new codec object for encoding and decoding our frame
// Create new codec object for encoding and decoding our frame.
let mut codec = Codec::new();
// create a u64 stream ID
let id : u64 = u32::MAX as u64 + 1 ;
let stream_id = LocalStreamId { num: id, role: Endpoint::Dialer };

// open a new frame with that stream ID
// Create a u64 stream ID.
let id: u64 = u32::MAX as u64 + 1;
let stream_id = LocalStreamId {
num: id,
role: Endpoint::Dialer,
};

// Open a new frame with that stream ID.
let original_frame = Frame::Open { stream_id };

// encode that frame
// Encode that frame.
let mut enc_frame = BytesMut::new();
match codec.encode(original_frame, &mut enc_frame) {
Err(e) => panic!("{}", e.to_string()),
_ => {}
};

// decode encoded frame and extract stream ID
let dec_string_id = match codec.decode(&mut enc_frame) {
Err(e) => panic!("{}", e.to_string()),
Ok(decoded_frame) => decoded_frame.unwrap().remote_id(),
};
// Decode encoded frame and extract stream ID.
let dec_string_id = codec.decode(&mut enc_frame)
.expect("Decoding to succeed.")
.map(|f| f.remote_id())
.unwrap();

assert_eq!(dec_string_id.num, stream_id.num);

}

}

0 comments on commit ead5d97

Please sign in to comment.