diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 5074014bd..3674fc49e 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -11,6 +11,7 @@ pub mod input; pub mod inspect; pub mod map; pub mod ok_err; +pub mod partition; pub mod probe; pub mod rc; pub mod reclock; @@ -27,6 +28,7 @@ pub use input::Input; pub use inspect::{Inspect, InspectCore}; pub use map::Map; pub use ok_err::OkErr; +pub use partition::Partition; pub use probe::Probe; pub use to_stream::{ToStream, ToStreamBuilder}; pub use reclock::Reclock; diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs new file mode 100644 index 000000000..e5c5ce2bd --- /dev/null +++ b/timely/src/dataflow/operators/core/partition.rs @@ -0,0 +1,75 @@ +//! Partition a stream of records into multiple streams. + +use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer}; + +use crate::dataflow::channels::pact::Pipeline; +use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::dataflow::{Scope, StreamCore}; +use crate::Data; + +/// Partition a stream of records into multiple streams. +pub trait Partition<G: Scope, C: Container> { + /// Produces `parts` output streams, containing records produced and assigned by `route`. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::ToStream; + /// use timely::dataflow::operators::core::{Partition, Inspect}; + /// + /// timely::example(|scope| { + /// let streams = (0..10).to_stream(scope) + /// .partition(3, |x| (x % 3, x)); + /// + /// for (idx, stream) in streams.into_iter().enumerate() { + /// stream + /// .container::<Vec<_>>() + /// .inspect(move |x| println!("seen {idx}: {x:?}")); + /// } + /// }); + /// ``` + fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>> + where + CB: ContainerBuilder, + CB::Container: SizableContainer + PushInto<D2> + Data, + F: FnMut(C::Item<'_>) -> (u64, D2) + 'static; +} + +impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> { + fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>> + where + CB: ContainerBuilder, + CB::Container: SizableContainer + PushInto<D2> + Data, + F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, + { + let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); + + let mut input = builder.new_input(self, Pipeline); + let mut outputs = Vec::with_capacity(parts as usize); + let mut streams = Vec::with_capacity(parts as usize); + + for _ in 0..parts { + let (output, stream) = builder.new_output(); + outputs.push(output); + streams.push(stream); + } + + builder.build(move |_| { + move |_frontiers| { + let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>(); + input.for_each(|time, data| { + let mut sessions = handles + .iter_mut() + .map(|h| h.session(&time)) + .collect::<Vec<_>>(); + + for datum in data.drain() { + let (part, datum2) = route(datum); + sessions[part as usize].give(datum2); + } + }); + } + }); + + streams + } +} diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/partition.rs index 39fc427cc..6388efc63 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/partition.rs @@ -1,7 +1,7 @@ //! Partition a stream of records into multiple streams. -use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; +use crate::container::CapacityContainerBuilder; +use crate::dataflow::operators::core::Partition as PartitionCore; use crate::dataflow::{Scope, Stream}; use crate::Data; @@ -27,32 +27,6 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> { impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> { fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> { - let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); - - let mut input = builder.new_input(self, Pipeline); - let mut outputs = Vec::with_capacity(parts as usize); - let mut streams = Vec::with_capacity(parts as usize); - - for _ in 0 .. parts { - let (output, stream) = builder.new_output(); - outputs.push(output); - streams.push(stream); - } - - builder.build(move |_| { - move |_frontiers| { - let mut handles = outputs.iter_mut().map(|o| o.activate()).collect::<Vec<_>>(); - input.for_each(|time, data| { - let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>(); - - for datum in data.drain(..) { - let (part, datum2) = route(datum); - sessions[part as usize].give(datum2); - } - }); - } - }); - - streams + PartitionCore::partition::<CapacityContainerBuilder<_>, _, _>(self, parts, route) } }