Skip to content

Commit

Permalink
Revert "async: relax requirements of supplied streams (TimelyDataflow…
Browse files Browse the repository at this point in the history
…#401)"

This reverts commit c5195eb.
  • Loading branch information
antiguru committed Feb 5, 2024
1 parent 6df895d commit 806540c
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions timely/src/dataflow/operators/to_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ pub trait ToStreamAsync<T: Timestamp, D: Data> {
/// Event::Progress(Some(0)),
/// ]);
///
/// let native_stream = Box::pin(native_stream);
///
/// let (data1, data2) = timely::example(|scope| {
/// let data1 = native_stream.to_stream(scope).capture();
/// let data2 = vec![0,1,2].to_stream(scope).capture();
Expand All @@ -150,17 +152,17 @@ pub trait ToStreamAsync<T: Timestamp, D: Data> {
///
/// assert_eq!(data1.extract(), data2.extract());
/// ```
fn to_stream<S: Scope<Timestamp = T>>(self, scope: &S) -> Stream<S, D>;
fn to_stream<S: Scope<Timestamp = T>>(self: Pin<Box<Self>>, scope: &S) -> Stream<S, D>;
}

impl<T, D, F, I> ToStreamAsync<T, D> for I
where
D: Data,
T: Timestamp,
F: IntoIterator<Item = T>,
I: futures_util::stream::Stream<Item = Event<F, D>> + Unpin + 'static,
I: futures_util::stream::Stream<Item = Event<F, D>> + ?Sized + 'static,
{
fn to_stream<S: Scope<Timestamp = T>>(mut self, scope: &S) -> Stream<S, D> {
fn to_stream<S: Scope<Timestamp = T>>(mut self: Pin<Box<Self>>, scope: &S) -> Stream<S, D> {
source(scope, "ToStreamAsync", move |capability, info| {
let activator = Arc::new(scope.sync_activator_for(&info.address[..]));

Expand All @@ -171,7 +173,7 @@ where
let mut context = Context::from_waker(&waker);

// Consume all the ready items of the source_stream and issue them to the operator
while let Poll::Ready(item) = Pin::new(&mut self).poll_next(&mut context) {
while let Poll::Ready(item) = self.as_mut().poll_next(&mut context) {
match item {
Some(Event::Progress(time)) => {
cap_set.downgrade(time);
Expand Down

0 comments on commit 806540c

Please sign in to comment.