From dede6fef9a50650efdd1ee36b2466911cc1b1950 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Tue, 28 Jan 2020 15:06:07 -0500 Subject: [PATCH] Use `Pin>` in `BodyStream` and `SizedStream` Fixes #1321 A better fix would be to change `MessageBody` to take a `Pin<&mut Self>`, rather than a `Pin<&mut Self>`. This will avoid requiring the use of `Box` for all consumers by allowing the caller to determine how to pin the `MessageBody` implementation (e.g. via stack pinning). However, doing so is a breaking change that will affect every user of `MessageBody`. By pinning the inner stream ourselves, we can fix the undefined behavior without breaking the API. I've included @sebzim4500's reproduction case as a new test case. However, due to the nature of undefined behavior, this could pass (and not segfault) even if underlying issue were to regress. Unfortunately, until rust-lang/unsafe-code-guidelines#148 is resolved, it's not even possible to write a Miri test that will pass when the bug is fixed. --- actix-http/src/body.rs | 16 ++++++---------- tests/test_weird_poll.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 10 deletions(-) create mode 100644 tests/test_weird_poll.rs diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 88176443956..e2bcce3593b 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -361,10 +361,8 @@ impl MessageBody for String { /// Type represent streaming body. /// Response does not contain `content-length` header and appropriate transfer encoding is used. -#[pin_project] pub struct BodyStream { - #[pin] - stream: S, + stream: Pin>, _t: PhantomData, } @@ -375,7 +373,7 @@ where { pub fn new(stream: S) -> Self { BodyStream { - stream, + stream: Box::pin(stream), _t: PhantomData, } } @@ -396,7 +394,7 @@ where /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream; + let mut stream = self.stream.as_mut(); loop { return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, @@ -408,11 +406,9 @@ where /// Type represent streaming body. This body implementation should be used /// if total size of stream is known. Data get sent as is without using transfer encoding. -#[pin_project] pub struct SizedStream { size: u64, - #[pin] - stream: S, + stream: Pin>, } impl SizedStream @@ -420,7 +416,7 @@ where S: Stream>, { pub fn new(size: u64, stream: S) -> Self { - SizedStream { size, stream } + SizedStream { size, stream: Box::pin(stream) } } } @@ -438,7 +434,7 @@ where /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream; + let mut stream = self.stream.as_mut(); loop { return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, diff --git a/tests/test_weird_poll.rs b/tests/test_weird_poll.rs new file mode 100644 index 00000000000..21d1d611a0d --- /dev/null +++ b/tests/test_weird_poll.rs @@ -0,0 +1,26 @@ +// Regression test for #/1321 + +use futures::task::{noop_waker, Context}; +use futures::stream::once; +use actix_http::body::{MessageBody, BodyStream}; +use bytes::Bytes; + +#[test] +fn weird_poll() { + let (sender, receiver) = futures::channel::oneshot::channel(); + let mut body_stream = Ok(BodyStream::new(once(async { + let x = Box::new(0); + let y = &x; + receiver.await.unwrap(); + let _z = **y; + Ok::<_, ()>(Bytes::new()) + }))); + + let waker = noop_waker(); + let mut context = Context::from_waker(&waker); + + let _ = body_stream.as_mut().unwrap().poll_next(&mut context); + sender.send(()).unwrap(); + let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context); +} +