Skip to content

Commit

Permalink
codec: rewrite of codec::Framed (#2368)
Browse files Browse the repository at this point in the history
Framed was designed to encapsulate both AsyncRead and AsyncWrite so
that it could wrap two-way connections. It used Fuse to manage the pinned
io object between the FramedWrite and FramedRead structs.

I replaced the Fuse struct by isolating the state used in reading and
writing, and making the code generic over that instead. This means
the FramedImpl struct now has a parameter for the state, and contains
the logic for both directions. The Framed* structs are now simply
wrappers around this type

Hopefully removing the `Pin` handling made things easier to
understand, too.
  • Loading branch information
Plecra authored May 12, 2020
1 parent 1cc0168 commit 221f421
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 554 deletions.
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features
```
The `cargo fmt` command does not work on the Tokio codebase. You can use the
command below instead:

#### Bash
```
rustfmt --check --edition 2018 $(find . -name '*.rs' -print)
```
#### Powershell
```
Get-ChildItem . -Filter "*.rs" -Recurse | foreach { rustfmt --check --edition 2018 $_.FullName }
```
The `--check` argument prints the things that need to be fixed. If you remove
it, `rustfmt` will update your files locally instead.

Expand Down
208 changes: 47 additions & 161 deletions tokio-util/src/codec/framed.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::codec::decoder::Decoder;
use crate::codec::encoder::Encoder;
use crate::codec::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
use crate::codec::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};

use tokio::{
io::{AsyncBufRead, AsyncRead, AsyncWrite},
io::{AsyncRead, AsyncWrite},
stream::Stream,
};

use bytes::BytesMut;
use futures_sink::Sink;
use pin_project_lite::pin_project;
use std::fmt;
use std::io::{self, BufRead, Read, Write};
use std::mem::MaybeUninit;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand All @@ -30,37 +28,7 @@ pin_project! {
/// [`Decoder::framed`]: crate::codec::Decoder::framed()
pub struct Framed<T, U> {
#[pin]
inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
}
}

pin_project! {
pub(crate) struct Fuse<T, U> {
#[pin]
pub(crate) io: T,
pub(crate) codec: U,
}
}

/// Abstracts over `FramedRead2` being either `FramedRead2<FramedWrite2<Fuse<T, U>>>` or
/// `FramedRead2<Fuse<T, U>>` and lets the io and codec parts be extracted in either case.
pub(crate) trait ProjectFuse {
type Io;
type Codec;

fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec>;
}

impl<T, U> ProjectFuse for Fuse<T, U> {
type Io = T;
type Codec = U;

fn project(self: Pin<&mut Self>) -> Fuse<Pin<&mut Self::Io>, &mut Self::Codec> {
let self_ = self.project();
Fuse {
io: self_.io,
codec: self_.codec,
}
inner: FramedImpl<T, U, RWFrames>
}
}

Expand Down Expand Up @@ -93,7 +61,11 @@ where
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn new(inner: T, codec: U) -> Framed<T, U> {
Framed {
inner: framed_read2(framed_write2(Fuse { io: inner, codec })),
inner: FramedImpl {
inner,
codec,
state: Default::default(),
},
}
}

Expand Down Expand Up @@ -123,10 +95,18 @@ where
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
Framed {
inner: framed_read2_with_buffer(
framed_write2(Fuse { io: inner, codec }),
BytesMut::with_capacity(capacity),
),
inner: FramedImpl {
inner,
codec,
state: RWFrames {
read: ReadFrame {
eof: false,
is_readable: false,
buffer: BytesMut::with_capacity(capacity),
},
write: WriteFrame::default(),
},
},
}
}
}
Expand Down Expand Up @@ -161,16 +141,14 @@ impl<T, U> Framed<T, U> {
/// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
Framed {
inner: framed_read2_with_buffer(
framed_write2_with_buffer(
Fuse {
io: parts.io,
codec: parts.codec,
},
parts.write_buf,
),
parts.read_buf,
),
inner: FramedImpl {
inner: parts.io,
codec: parts.codec,
state: RWFrames {
read: parts.read_buf.into(),
write: parts.write_buf.into(),
},
},
}
}

Expand All @@ -181,7 +159,7 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_ref(&self) -> &T {
&self.inner.get_ref().get_ref().io
&self.inner.inner
}

/// Returns a mutable reference to the underlying I/O stream wrapped by
Expand All @@ -191,7 +169,7 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner.get_mut().get_mut().io
&mut self.inner.inner
}

/// Returns a reference to the underlying codec wrapped by
Expand All @@ -200,7 +178,7 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying codec
/// as it may corrupt the stream of frames otherwise being worked with.
pub fn codec(&self) -> &U {
&self.inner.get_ref().get_ref().codec
&self.inner.codec
}

/// Returns a mutable reference to the underlying codec wrapped by
Expand All @@ -209,12 +187,12 @@ impl<T, U> Framed<T, U> {
/// Note that care should be taken to not tamper with the underlying codec
/// as it may corrupt the stream of frames otherwise being worked with.
pub fn codec_mut(&mut self) -> &mut U {
&mut self.inner.get_mut().get_mut().codec
&mut self.inner.codec
}

/// Returns a reference to the read buffer.
pub fn read_buffer(&self) -> &BytesMut {
self.inner.buffer()
&self.inner.state.read.buffer
}

/// Consumes the `Framed`, returning its underlying I/O stream.
Expand All @@ -223,7 +201,7 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_inner(self) -> T {
self.inner.into_inner().into_inner().io
self.inner.inner
}

/// Consumes the `Framed`, returning its underlying I/O stream, the buffer
Expand All @@ -233,19 +211,17 @@ impl<T, U> Framed<T, U> {
/// of data coming in as it may corrupt the stream of frames otherwise
/// being worked with.
pub fn into_parts(self) -> FramedParts<T, U> {
let (inner, read_buf) = self.inner.into_parts();
let (inner, write_buf) = inner.into_parts();

FramedParts {
io: inner.io,
codec: inner.codec,
read_buf,
write_buf,
io: self.inner.inner,
codec: self.inner.codec,
read_buf: self.inner.state.read.buffer,
write_buf: self.inner.state.write.buffer,
_priv: (),
}
}
}

// This impl just defers to the underlying FramedImpl
impl<T, U> Stream for Framed<T, U>
where
T: AsyncRead,
Expand All @@ -258,6 +234,7 @@ where
}
}

// This impl just defers to the underlying FramedImpl
impl<T, I, U> Sink<I> for Framed<T, U>
where
T: AsyncWrite,
Expand All @@ -267,19 +244,19 @@ where
type Error = U::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.get_pin_mut().poll_ready(cx)
self.project().inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
self.project().inner.get_pin_mut().start_send(item)
self.project().inner.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.get_pin_mut().poll_flush(cx)
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.get_pin_mut().poll_close(cx)
self.project().inner.poll_close(cx)
}
}

Expand All @@ -290,103 +267,12 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Framed")
.field("io", &self.inner.get_ref().get_ref().io)
.field("codec", &self.inner.get_ref().get_ref().codec)
.field("io", self.get_ref())
.field("codec", self.codec())
.finish()
}
}

// ===== impl Fuse =====

impl<T: Read, U> Read for Fuse<T, U> {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.io.read(dst)
}
}

impl<T: BufRead, U> BufRead for Fuse<T, U> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.io.fill_buf()
}

fn consume(&mut self, amt: usize) {
self.io.consume(amt)
}
}

impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
self.io.prepare_uninitialized_buffer(buf)
}

fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
self.project().io.poll_read(cx, buf)
}
}

impl<T: AsyncBufRead, U> AsyncBufRead for Fuse<T, U> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.project().io.poll_fill_buf(cx)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.project().io.consume(amt)
}
}

impl<T: Write, U> Write for Fuse<T, U> {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.io.write(src)
}

fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}

impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.project().io.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().io.poll_flush(cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().io.poll_shutdown(cx)
}
}

impl<T, U: Decoder> Decoder for Fuse<T, U> {
type Item = U::Item;
type Error = U::Error;

fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.codec.decode(buffer)
}

fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
self.codec.decode_eof(buffer)
}
}

impl<T, I, U: Encoder<I>> Encoder<I> for Fuse<T, U> {
type Error = U::Error;

fn encode(&mut self, item: I, dst: &mut BytesMut) -> Result<(), Self::Error> {
self.codec.encode(item, dst)
}
}

/// `FramedParts` contains an export of the data of a Framed transport.
/// It can be used to construct a new [`Framed`] with a different codec.
/// It contains all current buffers and the inner transport.
Expand Down
Loading

0 comments on commit 221f421

Please sign in to comment.