From 0d5e4b4b84e161a2a4a4c49e846240cb078b6ceb Mon Sep 17 00:00:00 2001
From: Moritz Hoffmann <antiguru@gmail.com>
Date: Mon, 5 Feb 2024 15:07:01 -0500
Subject: [PATCH] Revert "Add support for native async stream sources (#357)"

This reverts commit 935b7e90b39cc820bcc76918bbfc8b964fab1f1a.
---
 timely/Cargo.toml                          |  1 -
 timely/src/dataflow/operators/mod.rs       |  2 +-
 timely/src/dataflow/operators/to_stream.rs | 85 +---------------------
 timely/src/scheduling/activate.rs          |  8 --
 4 files changed, 4 insertions(+), 92 deletions(-)

diff --git a/timely/Cargo.toml b/timely/Cargo.toml
index 0b550e53c..9d8b68aad 100644
--- a/timely/Cargo.toml
+++ b/timely/Cargo.toml
@@ -31,7 +31,6 @@ timely_logging = { path = "../logging", version = "0.12" }
 timely_communication = { path = "../communication", version = "0.12", default-features = false }
 timely_container = { path = "../container", version = "0.12" }
 crossbeam-channel = "0.5.0"
-futures-util = "0.3"
 
 [dev-dependencies]
 # timely_sort="0.1.6"
diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs
index 508d10ac6..1eec15cba 100644
--- a/timely/src/dataflow/operators/mod.rs
+++ b/timely/src/dataflow/operators/mod.rs
@@ -21,7 +21,7 @@ pub use self::delay::Delay;
 pub use self::exchange::Exchange;
 pub use self::broadcast::Broadcast;
 pub use self::probe::Probe;
-pub use self::to_stream::{ToStream, ToStreamCore, ToStreamAsync, Event};
+pub use self::to_stream::{ToStream, ToStreamCore};
 pub use self::capture::Capture;
 pub use self::branch::{Branch, BranchWhen};
 pub use self::ok_err::OkErr;
diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs
index 39e4bd8fd..f898a8981 100644
--- a/timely/src/dataflow/operators/to_stream.rs
+++ b/timely/src/dataflow/operators/to_stream.rs
@@ -1,15 +1,15 @@
 //! Conversion to the `Stream` type from iterators.
 
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll};
 use crate::Container;
 
 use crate::dataflow::operators::generic::operator::source;
 use crate::dataflow::operators::CapabilitySet;
 use crate::dataflow::{StreamCore, Scope, Stream};
 use crate::progress::Timestamp;
+
 use crate::Data;
+use crate::dataflow::operators::generic::operator::source;
+use crate::dataflow::{Stream, Scope};
 
 /// Converts to a timely `Stream`.
 pub trait ToStream<T: Timestamp, D: Data> {
@@ -112,82 +112,3 @@ impl<T: Timestamp, I: IntoIterator+'static> ToStreamCore<T, I::Item> for I where
         })
     }
 }
-
-/// Data and progress events of the native stream.
-pub enum Event<F: IntoIterator, D> {
-    /// Indicates that timestamps have advanced to frontier F
-    Progress(F),
-    /// Indicates that event D happened at time T
-    Message(F::Item, D),
-}
-
-/// Converts to a timely `Stream`.
-pub trait ToStreamAsync<T: Timestamp, D: Data> {
-    /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely
-    /// `Stream`](crate::dataflow::Stream).
-    ///
-    /// # Examples
-    ///
-    /// ```
-    /// use futures_util::stream;
-    ///
-    /// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync};
-    /// use timely::dataflow::operators::capture::Extract;
-    ///
-    /// let native_stream = stream::iter(vec![
-    ///     Event::Message(0, 0),
-    ///     Event::Message(0, 1),
-    ///     Event::Message(0, 2),
-    ///     Event::Progress(Some(0)),
-    /// ]);
-    ///
-    /// let native_stream = Box::pin(native_stream);
-    ///
-    /// let (data1, data2) = timely::example(|scope| {
-    ///     let data1 = native_stream.to_stream(scope).capture();
-    ///     let data2 = vec![0,1,2].to_stream(scope).capture();
-    ///
-    ///     (data1, data2)
-    /// });
-    ///
-    /// assert_eq!(data1.extract(), data2.extract());
-    /// ```
-    fn to_stream<S: Scope<Timestamp = T>>(self: Pin<Box<Self>>, scope: &S) -> Stream<S, D>;
-}
-
-impl<T, D, F, I> ToStreamAsync<T, D> for I
-where
-    D: Data,
-    T: Timestamp,
-    F: IntoIterator<Item = T>,
-    I: futures_util::stream::Stream<Item = Event<F, D>> + ?Sized + 'static,
-{
-    fn to_stream<S: Scope<Timestamp = T>>(mut self: Pin<Box<Self>>, scope: &S) -> Stream<S, D> {
-        source(scope, "ToStreamAsync", move |capability, info| {
-            let activator = Arc::new(scope.sync_activator_for(&info.address[..]));
-
-            let mut cap_set = CapabilitySet::from_elem(capability);
-
-            move |output| {
-                let waker = futures_util::task::waker_ref(&activator);
-                let mut context = Context::from_waker(&waker);
-
-                // Consume all the ready items of the source_stream and issue them to the operator
-                while let Poll::Ready(item) = self.as_mut().poll_next(&mut context) {
-                    match item {
-                        Some(Event::Progress(time)) => {
-                            cap_set.downgrade(time);
-                        }
-                        Some(Event::Message(time, data)) => {
-                            output.session(&cap_set.delayed(&time)).give(data);
-                        }
-                        None => {
-                            cap_set.downgrade(&[]);
-                            break;
-                        }
-                    }
-                }
-            }
-        })
-    }
-}
diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs
index e91c87a99..e86f44355 100644
--- a/timely/src/scheduling/activate.rs
+++ b/timely/src/scheduling/activate.rs
@@ -1,14 +1,12 @@
 //! Parking and unparking timely fibers.
 
 use std::rc::Rc;
-use std::sync::Arc;
 use std::cell::RefCell;
 use std::thread::Thread;
 use std::collections::BinaryHeap;
 use std::time::{Duration, Instant};
 use std::cmp::Reverse;
 use crossbeam_channel::{Sender, Receiver};
-use futures_util::task::ArcWake;
 
 /// Methods required to act as a timely scheduler.
 ///
@@ -274,12 +272,6 @@ impl SyncActivator {
     }
 }
 
-impl ArcWake for SyncActivator {
-    fn wake_by_ref(arc_self: &Arc<Self>) {
-        arc_self.activate().unwrap();
-    }
-}
-
 /// The error returned when activation fails across thread boundaries because
 /// the receiving end has hung up.
 #[derive(Clone, Copy, Debug)]