-
-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor!: Reimplement stream using futures and without split #290
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really nice! I would like just to proposed some minor improvements (syntax and formating):
diff --git a/src/stream.rs b/src/stream.rs
index 8f6e8f7..42f6cc5 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -1,13 +1,15 @@
use std::{
collections::VecDeque,
convert::Infallible,
- future::{poll_fn, Future},
+ future::poll_fn,
io::IoSlice,
- pin::pin,
- task::{Context, Poll},
+ task::{ready, Context, Poll},
};
-use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::{
+ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
+ FutureExt,
+};
#[cfg(debug_assertions)]
use imap_codec::imap_types::utils::escape_byte_string;
use thiserror::Error;
@@ -99,13 +101,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
self.read_buffer.poll_read(&mut self.stream, cx)?
};
- if let Poll::Ready(bytes) = result {
- // Provide input bytes to the client/server and try again
- state.enqueue_input(bytes);
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
+ let bytes = ready!(result);
+ // Provide input bytes to the client/server and try again
+ state.enqueue_input(bytes);
+ Poll::Ready(Ok(()))
})
.await?;
};
@@ -148,10 +147,7 @@ impl ReadBuffer {
cx: &mut Context<'_>,
) -> Poll<Result<&[u8], ReadWriteError>> {
// Constructing this future is cheap
- let read_future = pin!(stream.read(&mut self.bytes));
- let Poll::Ready(byte_count) = read_future.poll(cx)? else {
- return Poll::Pending;
- };
+ let byte_count = ready!(stream.read(&mut self.bytes).poll_unpin(cx)?);
#[cfg(debug_assertions)]
trace!(
@@ -200,20 +196,16 @@ impl WriteBuffer {
stream: &mut S,
cx: &mut Context<'_>,
) -> Poll<Result<(), ReadWriteError>> {
- while !self.bytes.is_empty() {
+ while self.needs_write() {
let write_slices = &self.write_slices();
// Constructing this future is cheap
- let write_future = pin!(stream.write_vectored(write_slices));
- let Poll::Ready(byte_count) = write_future.poll(cx)? else {
- return Poll::Pending;
- };
+ let byte_count = ready!(stream.write_vectored(write_slices).poll_unpin(cx)?);
#[cfg(debug_assertions)]
trace!(
data = escape_byte_string(
- self
- .bytes
+ self.bytes
.iter()
.copied()
.take(byte_count)
For this patch, I got inspiration from this blog post. Using const bools internally allows us to propose same named functions on the same structure but with different signature. It can be made transparent for users with simple aliases. diff --git a/src/stream.rs b/src/stream.rs
index 8f6e8f7..a7821ea 100644
--- a/src/stream.rs
+++ b/src/stream.rs
@@ -1,13 +1,15 @@
use std::{
collections::VecDeque,
convert::Infallible,
- future::{poll_fn, Future},
- io::IoSlice,
- pin::pin,
- task::{Context, Poll},
+ future::poll_fn,
+ io::{IoSlice, Read, Write},
+ task::{ready, Context, Poll},
};
-use futures_util::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::{
+ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
+ FutureExt,
+};
#[cfg(debug_assertions)]
use imap_codec::imap_types::utils::escape_byte_string;
use thiserror::Error;
@@ -16,28 +18,32 @@ use tracing::trace;
use crate::{Interrupt, Io, State};
-pub struct Stream<S> {
+pub const ASYNC: bool = true;
+pub const BLOCKING: bool = false;
+
+pub type Stream<S> = DuplexStream<S, ASYNC>;
+pub type BlockingStream<S> = DuplexStream<S, BLOCKING>;
+
+pub struct DuplexStream<S, const ASYNC: bool> {
stream: S,
- read_buffer: ReadBuffer,
- write_buffer: WriteBuffer,
+ read_buffer: ReadBuffer<ASYNC>,
+ write_buffer: WriteBuffer<ASYNC>,
}
-impl<S> Stream<S> {
+impl<S, const ASYNC: bool> DuplexStream<S, ASYNC> {
pub fn new(stream: S) -> Self {
Self {
stream,
- read_buffer: ReadBuffer::new(),
- write_buffer: WriteBuffer::new(),
+ read_buffer: ReadBuffer::<ASYNC>::new(),
+ write_buffer: WriteBuffer::<ASYNC>::new(),
}
}
-}
-impl<S> Stream<S> {
- #[cfg(feature = "expose_stream")]
/// Return the underlying stream for debug purposes (or experiments).
///
/// Note: Writing to or reading from the stream may introduce
/// conflicts with `imap-next`.
+ #[cfg(feature = "expose_stream")]
pub fn stream_mut(&mut self) -> &mut S {
&mut self.stream
}
@@ -99,13 +105,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
self.read_buffer.poll_read(&mut self.stream, cx)?
};
- if let Poll::Ready(bytes) = result {
- // Provide input bytes to the client/server and try again
- state.enqueue_input(bytes);
- Poll::Ready(Ok(()))
- } else {
- Poll::Pending
- }
+ let bytes = ready!(result);
+ // Provide input bytes to the client/server and try again
+ state.enqueue_input(bytes);
+ Poll::Ready(Ok(()))
})
.await?;
};
@@ -114,6 +117,42 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
}
}
+impl<S: Read + Write> BlockingStream<S> {
+ pub fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
+ let event = loop {
+ // Progress the client/server
+ let result = state.next();
+
+ // Return events immediately without doing IO
+ let interrupt = match result {
+ Err(interrupt) => interrupt,
+ Ok(event) => break event,
+ };
+
+ // Return errors immediately without doing IO
+ let io = match interrupt {
+ Interrupt::Io(io) => io,
+ Interrupt::Error(err) => return Err(Error::State(err)),
+ };
+
+ // Handle the output bytes from the client/server
+ if let Io::Output(bytes) = io {
+ self.write_buffer.push_bytes(bytes);
+ }
+
+ // Progress the stream
+ if self.write_buffer.needs_write() {
+ self.write_buffer.write(&mut self.stream)?;
+ };
+
+ let bytes = self.read_buffer.read(&mut self.stream)?;
+ state.enqueue_input(bytes);
+ };
+
+ Ok(event)
+ }
+}
+
/// Error during reading into or writing from a stream.
#[derive(Debug, Error)]
pub enum Error<E> {
@@ -131,27 +170,26 @@ pub enum Error<E> {
State(E),
}
-struct ReadBuffer {
+struct ReadBuffer<const ASYNC: bool> {
bytes: Box<[u8]>,
}
-impl ReadBuffer {
+impl<const ASYNC: bool> ReadBuffer<ASYNC> {
fn new() -> Self {
Self {
bytes: vec![0; 1024].into(),
}
}
+}
+impl ReadBuffer<ASYNC> {
fn poll_read<S: AsyncRead + Unpin>(
&mut self,
stream: &mut S,
cx: &mut Context<'_>,
) -> Poll<Result<&[u8], ReadWriteError>> {
// Constructing this future is cheap
- let read_future = pin!(stream.read(&mut self.bytes));
- let Poll::Ready(byte_count) = read_future.poll(cx)? else {
- return Poll::Pending;
- };
+ let byte_count = ready!(stream.read(&mut self.bytes).poll_unpin(cx)?);
#[cfg(debug_assertions)]
trace!(
@@ -168,14 +206,34 @@ impl ReadBuffer {
}
}
-struct WriteBuffer {
+impl ReadBuffer<BLOCKING> {
+ fn read<S: Read>(&mut self, stream: &mut S) -> Result<&[u8], ReadWriteError> {
+ // Constructing this future is cheap
+ let byte_count = stream.read(&mut self.bytes)?;
+
+ #[cfg(debug_assertions)]
+ trace!(
+ data = escape_byte_string(&self.bytes[0..byte_count]),
+ "io/read/raw"
+ );
+
+ if byte_count == 0 {
+ // The result is 0 if the stream reached "end of file"
+ return Err(ReadWriteError::Closed);
+ }
+
+ Ok(&self.bytes[0..byte_count])
+ }
+}
+
+struct WriteBuffer<const ASYNC: bool> {
/// Output bytes that needs to be written.
///
/// Enqueue output bytes to the back, dequeue written bytes from the front.
bytes: VecDeque<u8>,
}
-impl WriteBuffer {
+impl<const ASYNC: bool> WriteBuffer<ASYNC> {
fn new() -> Self {
Self {
bytes: VecDeque::new(),
@@ -194,26 +252,24 @@ impl WriteBuffer {
let (init, tail) = self.bytes.as_slices();
[IoSlice::new(init), IoSlice::new(tail)]
}
+}
+impl WriteBuffer<ASYNC> {
fn poll_write<S: AsyncWrite + Unpin>(
&mut self,
stream: &mut S,
cx: &mut Context<'_>,
) -> Poll<Result<(), ReadWriteError>> {
- while !self.bytes.is_empty() {
+ while self.needs_write() {
let write_slices = &self.write_slices();
// Constructing this future is cheap
- let write_future = pin!(stream.write_vectored(write_slices));
- let Poll::Ready(byte_count) = write_future.poll(cx)? else {
- return Poll::Pending;
- };
+ let byte_count = ready!(stream.write_vectored(write_slices).poll_unpin(cx)?);
#[cfg(debug_assertions)]
trace!(
data = escape_byte_string(
- self
- .bytes
+ self.bytes
.iter()
.copied()
.take(byte_count)
@@ -237,6 +293,41 @@ impl WriteBuffer {
}
}
+impl WriteBuffer<BLOCKING> {
+ fn write<S: Write>(&mut self, stream: &mut S) -> Result<(), ReadWriteError> {
+ while self.needs_write() {
+ let write_slices = &self.write_slices();
+
+ // Constructing this future is cheap
+ let byte_count = stream.write_vectored(write_slices)?;
+
+ #[cfg(debug_assertions)]
+ trace!(
+ data = escape_byte_string(
+ self.bytes
+ .iter()
+ .copied()
+ .take(byte_count)
+ .collect::<Vec<_>>()
+ ),
+ "io/write/raw"
+ );
+
+ // Drop written bytes
+ drop(self.bytes.drain(..byte_count));
+
+ if byte_count == 0 {
+ // The result is 0 if the stream doesn't accept bytes anymore or the write buffer
+ // was already empty before calling `write_buf`. Because we checked the buffer
+ // we know that the first case occurred.
+ return Err(ReadWriteError::Closed);
+ }
+ }
+
+ Ok(())
+ }
+}
+
#[derive(Debug, Error)]
enum ReadWriteError {
#[error("Stream was closed")] |
Your first patch looks good, thanks. I committed it.
I would prefer a separate issue or PR for that. But I'm not a big fan of this topic. See below.
Not sure how things get easier with a separate crate. The
No hurry, STARTTLS is a complicated topic. We need to be slow and careful.
Yes, e.g.
Hm, I don't like it. It looks too complicated. I think the type magic is unnecessary, we could achieve something similar by adding a simple impl<S: std::io::Read + std::io::Write> Stream<S> {
pub fn block_next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
// ...
}
} However, the bigger problem is that the sync implementation can't write in read in parallel. If server and client are using this implementation, both would insist to write first and could block each other by that. The async implementation don't have this issue. Maybe we can solve or accept that problem, but I'm not motivated to dig into this issue. |
f24f8ae
to
de9d490
Compare
I believe it would be really useful, and would have no impact on your actual usage in Similar to the actual Stream:
The main differences with Stream:
It should integrate effortlessly with the actual Stream (EDIT: see last comment for example). Such a Duplex Stream would be so useful outside of
Sure, I wanted to say that I will be ready soon to propose something, so we can discuss on. I find it easier to iterate over sth concrete.
The futures crate exposes a I will be able to experiment inside Pimalaya libs anyway, it should give me a great feadback about usability of such idea.
This forces you to have multiple functions with different names, which I don't like either. The user experience is way better and is far more intuitive when you have same function names (even with different implems: params, async etc). impl<S: std::io::Read + std::io::Write> Stream<S, true> {
pub fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
// ...
}
}
impl<S: futures::AsyncRead + futures::AsyncWrite> Stream<S, false> {
pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
// ...
}
}
let mut tcp_stream = std::net::TcpStream::connect(…);
let mut stream = Stream::from(tcp_stream);
stream.next()
let mut tcp_stream = async_std::net::TcpStream::connect(…);
let mut stream = Stream::from(tcp_stream);
stream.next().await
// versus
stream.block_next()
stream.next().await
BlockStream::from(tcp_stream)
…
I have some idea that I want to experiment. It's another good argument for exporting the duplex stream, this logic could go there and not impact the actual stream. I am ready to do the export thing and even to maintain it! |
I just found sth interesting for https://docs.rs/futures-util/latest/futures_util/io/struct.AllowStdIo.html |
EDIT: I renamed the project https://github.com/pimalaya/core/tree/master/buf-stream |
This is how it would integrate with |
As I mentionned here: #169 (comment), I ran into several issues with the previous implementation. The actual proposition is the following:
|
I misunderstood your proposal. I thought you want to move the entire However, this is only implementation detail. I would focus on getting this PR merged. I'll refactor the current implementation and introduce a simple About naming: I like
You are absolutely right. If this PR succeeds and About sync/async. I think the most common approach is to use different traits like For now I need to tackle some other issues first. The CI is broken and I noticed a fundamental flaw in |
Co-authored-by: Clément DOUIN <clement.douin@posteo.net>
Co-authored-by: Clément DOUIN <clement.douin@posteo.net>
de9d490
to
49f6d84
Compare
Sounds great, do not hesitate to take stuff from my attempt in Pimalaya core.
I initially liked So far,
Sure!
My concern is that the lib would not be usable without runtime. You will always have someone that cannot use async and would like to use the lib. I don't think it would be such a burden to maintain a Supporting blocking with std and async with futures should be the bare minimum. Other runtimes are optional and could be supported user side, with compatibility layers (like
Sure, let me know if and how I could help. |
If I understood well, it is technically not possible to read from and write into a stream at the same time:
I also dived into the buffered reader/writer topic:
If I don't mistake,
As I said earlier, there is no efficient way to both read and write at the same time. I would rather optimize the usage of read/write inside match io {
Io::Output(bytes) => {
self.stream.write(&bytes).await?;
// We should always receive bytes, since we just wrote sth.
// Deadlock cannot happen here.
let mut bytes = vec![0; 1024];
let n = self.stream.read(&bytes).await?;
state.enqueue_input(&bytes[..n]);
}
Io::NeedMoreInput => {
// Is there such a case where the state needs more input,
// but nothing more is available on the stream?
let mut bytes = vec![0; 1024];
let n = self.stream.read(&bytes).await?;
state.enqueue_input(&bytes[..n]);
}
} I will experiment this idea with high-level apps to see how it behaves. |
Very good news: this simple implementation seems to work as expected: impl<S: AsyncRead + AsyncWrite + Unpin> Stream<S> {
pub async fn next<F: State>(&mut self, mut state: F) -> Result<F::Event, Error<F::Error>> {
let event = loop {
// Progress the client/server
let result = state.next();
// Return events immediately without doing IO
let interrupt = match result {
Err(interrupt) => interrupt,
Ok(event) => break event,
};
// Return errors immediately without doing IO
let io = match interrupt {
Interrupt::Io(io) => io,
Interrupt::Error(err) => return Err(Error::State(err)),
};
// Handle the output bytes from the client/server
if let Io::Output(ref bytes) = io {
self.stream.write(bytes).await?;
}
// After a write, or if more input is need
let n = self.stream.read(&mut self.buf).await?;
state.enqueue_input(&self.buf[..n]);
};
Ok(event)
}
} I tried this implementation with a simple client and with a synchronizer (using client pool) over 5k+ mails, I did not encounter a single deadlock. My conclusion is that we do not need buffering nor "simultaneous" read and write, just direct read and write calls is enough. |
Side note regarding TLS: I also tried pub struct MaybeTlsStream(Either<Compat<TcpStream>, Compat<TlsStream<TcpStream>>>);
impl MaybeTlsStream {
pub fn tcp(stream: TcpStream) -> Self {
Self(Either::Left(stream.compat()))
}
pub fn tls(stream: TlsStream<TcpStream>) -> Self {
Self(Either::Right(stream.compat()))
}
}
impl AsyncRead for MaybeTlsStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
match &mut self.get_mut().0 {
Either::Left(s) => Pin::new(s).poll_read(cx, buf),
Either::Right(s) => Pin::new(s).poll_read(cx, buf),
}
}
}
impl AsyncWrite for MaybeTlsStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
match &mut self.get_mut().0 {
Either::Left(s) => Pin::new(s).poll_write(cx, buf),
Either::Right(s) => Pin::new(s).poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match &mut self.get_mut().0 {
Either::Left(s) => Pin::new(s).poll_flush(cx),
Either::Right(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
match &mut self.get_mut().0 {
Either::Left(s) => Pin::new(s).poll_close(cx),
Either::Right(s) => Pin::new(s).poll_close(cx),
}
}
} The client can just hold a pub struct Client {
host: String,
stream: Stream<MaybeTlsStream>,
resolver: Resolver,
capabilities: Vec1<Capability<'static>>,
idle_timeout: Duration,
} |
This is an alternative to #274.
It simplifies
Stream
drastically:Highlights:
Stream
is now based onfutures-io
(or more specificfutures-util
)tokio
,tokio-rustls
,bytes
tokio
Compat
fromtokio-util
allows us to be compatible withfutures-io
State
as soon as possiblefutures-io
is compatible with async-std and tokiointegration-test
with TLS #245Closes #275
Closes #212
Closes #169
Closes #245