diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 597ec9163..39e4bd8fd 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -141,6 +141,8 @@ pub trait ToStreamAsync { /// Event::Progress(Some(0)), /// ]); /// + /// let native_stream = Box::pin(native_stream); + /// /// let (data1, data2) = timely::example(|scope| { /// let data1 = native_stream.to_stream(scope).capture(); /// let data2 = vec![0,1,2].to_stream(scope).capture(); @@ -150,7 +152,7 @@ pub trait ToStreamAsync { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream>(self, scope: &S) -> Stream; + fn to_stream>(self: Pin>, scope: &S) -> Stream; } impl ToStreamAsync for I @@ -158,9 +160,9 @@ where D: Data, T: Timestamp, F: IntoIterator, - I: futures_util::stream::Stream> + Unpin + 'static, + I: futures_util::stream::Stream> + ?Sized + 'static, { - fn to_stream>(mut self, scope: &S) -> Stream { + fn to_stream>(mut self: Pin>, scope: &S) -> Stream { source(scope, "ToStreamAsync", move |capability, info| { let activator = Arc::new(scope.sync_activator_for(&info.address[..])); @@ -171,7 +173,7 @@ where let mut context = Context::from_waker(&waker); // Consume all the ready items of the source_stream and issue them to the operator - while let Poll::Ready(item) = Pin::new(&mut self).poll_next(&mut context) { + while let Poll::Ready(item) = self.as_mut().poll_next(&mut context) { match item { Some(Event::Progress(time)) => { cap_set.downgrade(time);