From 0d5e4b4b84e161a2a4a4c49e846240cb078b6ceb Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 5 Feb 2024 15:07:01 -0500 Subject: [PATCH] Revert "Add support for native async stream sources (#357)" This reverts commit 935b7e90b39cc820bcc76918bbfc8b964fab1f1a. --- timely/Cargo.toml | 1 - timely/src/dataflow/operators/mod.rs | 2 +- timely/src/dataflow/operators/to_stream.rs | 85 +--------------------- timely/src/scheduling/activate.rs | 8 -- 4 files changed, 4 insertions(+), 92 deletions(-) diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 0b550e53c..9d8b68aad 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -31,7 +31,6 @@ timely_logging = { path = "../logging", version = "0.12" } timely_communication = { path = "../communication", version = "0.12", default-features = false } timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" -futures-util = "0.3" [dev-dependencies] # timely_sort="0.1.6" diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 508d10ac6..1eec15cba 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -21,7 +21,7 @@ pub use self::delay::Delay; pub use self::exchange::Exchange; pub use self::broadcast::Broadcast; pub use self::probe::Probe; -pub use self::to_stream::{ToStream, ToStreamCore, ToStreamAsync, Event}; +pub use self::to_stream::{ToStream, ToStreamCore}; pub use self::capture::Capture; pub use self::branch::{Branch, BranchWhen}; pub use self::ok_err::OkErr; diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/to_stream.rs index 39e4bd8fd..f898a8981 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/to_stream.rs @@ -1,15 +1,15 @@ //! Conversion to the `Stream` type from iterators. -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; use crate::Container; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::CapabilitySet; use crate::dataflow::{StreamCore, Scope, Stream}; use crate::progress::Timestamp; + use crate::Data; +use crate::dataflow::operators::generic::operator::source; +use crate::dataflow::{Stream, Scope}; /// Converts to a timely `Stream`. pub trait ToStream { @@ -112,82 +112,3 @@ impl ToStreamCore for I where }) } } - -/// Data and progress events of the native stream. -pub enum Event { - /// Indicates that timestamps have advanced to frontier F - Progress(F), - /// Indicates that event D happened at time T - Message(F::Item, D), -} - -/// Converts to a timely `Stream`. -pub trait ToStreamAsync { - /// Converts a [native `Stream`](futures_util::stream::Stream) of [`Event`s](Event) into a [timely - /// `Stream`](crate::dataflow::Stream). - /// - /// # Examples - /// - /// ``` - /// use futures_util::stream; - /// - /// use timely::dataflow::operators::{Capture, Event, ToStream, ToStreamAsync}; - /// use timely::dataflow::operators::capture::Extract; - /// - /// let native_stream = stream::iter(vec![ - /// Event::Message(0, 0), - /// Event::Message(0, 1), - /// Event::Message(0, 2), - /// Event::Progress(Some(0)), - /// ]); - /// - /// let native_stream = Box::pin(native_stream); - /// - /// let (data1, data2) = timely::example(|scope| { - /// let data1 = native_stream.to_stream(scope).capture(); - /// let data2 = vec![0,1,2].to_stream(scope).capture(); - /// - /// (data1, data2) - /// }); - /// - /// assert_eq!(data1.extract(), data2.extract()); - /// ``` - fn to_stream>(self: Pin>, scope: &S) -> Stream; -} - -impl ToStreamAsync for I -where - D: Data, - T: Timestamp, - F: IntoIterator, - I: futures_util::stream::Stream> + ?Sized + 'static, -{ - fn to_stream>(mut self: Pin>, scope: &S) -> Stream { - source(scope, "ToStreamAsync", move |capability, info| { - let activator = Arc::new(scope.sync_activator_for(&info.address[..])); - - let mut cap_set = CapabilitySet::from_elem(capability); - - move |output| { - let waker = futures_util::task::waker_ref(&activator); - let mut context = Context::from_waker(&waker); - - // Consume all the ready items of the source_stream and issue them to the operator - while let Poll::Ready(item) = self.as_mut().poll_next(&mut context) { - match item { - Some(Event::Progress(time)) => { - cap_set.downgrade(time); - } - Some(Event::Message(time, data)) => { - output.session(&cap_set.delayed(&time)).give(data); - } - None => { - cap_set.downgrade(&[]); - break; - } - } - } - } - }) - } -} diff --git a/timely/src/scheduling/activate.rs b/timely/src/scheduling/activate.rs index e91c87a99..e86f44355 100644 --- a/timely/src/scheduling/activate.rs +++ b/timely/src/scheduling/activate.rs @@ -1,14 +1,12 @@ //! Parking and unparking timely fibers. use std::rc::Rc; -use std::sync::Arc; use std::cell::RefCell; use std::thread::Thread; use std::collections::BinaryHeap; use std::time::{Duration, Instant}; use std::cmp::Reverse; use crossbeam_channel::{Sender, Receiver}; -use futures_util::task::ArcWake; /// Methods required to act as a timely scheduler. /// @@ -274,12 +272,6 @@ impl SyncActivator { } } -impl ArcWake for SyncActivator { - fn wake_by_ref(arc_self: &Arc) { - arc_self.activate().unwrap(); - } -} - /// The error returned when activation fails across thread boundaries because /// the receiving end has hung up. #[derive(Clone, Copy, Debug)]