From 7bfbefe405d45219e67d5d7d9632646146848337 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 24 Nov 2024 15:01:22 -0500 Subject: [PATCH] Remove Bundle type and have Message implement Bytesable directly --- timely/src/dataflow/channels/mod.rs | 29 +++++++++++--- timely/src/dataflow/channels/pact.rs | 38 +++++++++---------- .../src/dataflow/channels/pullers/counter.rs | 12 +++--- .../src/dataflow/channels/pushers/buffer.rs | 24 ++++++------ .../src/dataflow/channels/pushers/counter.rs | 10 ++--- .../src/dataflow/channels/pushers/exchange.rs | 14 +++---- timely/src/dataflow/channels/pushers/tee.rs | 10 ++--- .../operators/core/capture/capture.rs | 4 +- .../src/dataflow/operators/core/enterleave.rs | 28 +++++++------- timely/src/dataflow/operators/core/probe.rs | 4 +- .../src/dataflow/operators/generic/handles.rs | 28 +++++++------- timely/src/dataflow/stream.rs | 6 +-- 12 files changed, 112 insertions(+), 95 deletions(-) diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 3a1e23f18..7031487aa 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -11,9 +11,6 @@ pub mod pullers; /// Parallelization contracts, describing how data must be exchanged between operators. pub mod pact; -/// The input to and output from timely dataflow communication channels. -pub type Bundle = crate::Bincode>; - /// A serializable representation of timestamped data. #[derive(Clone, Serialize, Deserialize)] pub struct Message { @@ -44,17 +41,37 @@ impl Message { /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher /// leaves in place, or the container's default element. The buffer is cleared. #[inline] - pub fn push_at>>(buffer: &mut C, time: T, pusher: &mut P) { + pub fn push_at>>(buffer: &mut C, time: T, pusher: &mut P) { let data = ::std::mem::take(buffer); let message = Message::new(time, data, 0, 0); - let mut bundle = Some(Bundle::from(message)); + let mut bundle = Some(Message::from(message)); pusher.push(&mut bundle); if let Some(message) = bundle { - *buffer = message.payload.data; + *buffer = message.data; buffer.clear(); } } } + +// Instructions for serialization of `Message`. +// Intended to swap out the constraint on `C` for `C: Bytesable`. +impl crate::communication::Bytesable for Message +where + T: Serialize + for<'a> Deserialize<'a>, + C: Serialize + for<'a> Deserialize<'a>, +{ + fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { + ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed") + } + + fn length_in_bytes(&self) -> usize { + ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize + } + + fn into_bytes(&self, writer: &mut W) { + ::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed"); + } +} \ No newline at end of file diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 00ef24058..a744ab078 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; use crate::container::PushPartitioned; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::Message; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; use crate::worker::AsWorker; @@ -24,9 +24,9 @@ use crate::ExchangeData; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { /// Type implementing `Push` produced by this pact. - type Pusher: Push>+'static; + type Pusher: Push>+'static; /// Type implementing `Pull` produced by this pact. - type Puller: Pull>+'static; + type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller); } @@ -36,10 +36,10 @@ pub trait ParallelizationContract { pub struct Pipeline; impl ParallelizationContract for Pipeline { - type Pusher = LogPusher>>; - type Puller = LogPuller>>; + type Pusher = LogPusher>>; + type Puller = LogPuller>>; fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (pusher, puller) = allocator.pipeline::>(identifier, address); + let (pusher, puller) = allocator.pipeline::>(identifier, address); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), LogPuller::new(puller, allocator.index(), identifier, logging)) } @@ -71,11 +71,11 @@ where C: ExchangeData + PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { - type Pusher = ExchangePusher>>>, H>; - type Puller = LogPuller>>>; + type Pusher = ExchangePusher>>>, H>; + type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); + let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } @@ -89,7 +89,7 @@ impl Debug for ExchangeCore { /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. #[derive(Debug)] -pub struct LogPusher>> { +pub struct LogPusher>> { pusher: P, channel: usize, counter: usize, @@ -99,7 +99,7 @@ pub struct LogPusher>> { logging: Option, } -impl>> LogPusher { +impl>> LogPusher { /// Allocates a new pusher. pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { LogPusher { @@ -114,16 +114,16 @@ impl>> LogPusher { } } -impl>> Push> for LogPusher { +impl>> Push> for LogPusher { #[inline] - fn push(&mut self, pair: &mut Option>) { + fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; // Stamp the sequence number and source. // FIXME: Awkward moment/logic. - bundle.payload.seq = self.counter - 1; - bundle.payload.from = self.source; + bundle.seq = self.counter - 1; + bundle.from = self.source; if let Some(logger) = self.logging.as_ref() { logger.log(MessagesEvent { @@ -143,7 +143,7 @@ impl>> Push> for LogPusher` puller to provide a `Pull<(T, Content)>`. #[derive(Debug)] -pub struct LogPuller>> { +pub struct LogPuller>> { puller: P, channel: usize, index: usize, @@ -151,7 +151,7 @@ pub struct LogPuller>> { logging: Option, } -impl>> LogPuller { +impl>> LogPuller { /// Allocates a new `Puller`. pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { LogPuller { @@ -164,9 +164,9 @@ impl>> LogPuller { } } -impl>> Pull> for LogPuller { +impl>> Pull> for LogPuller { #[inline] - fn pull(&mut self) -> &mut Option> { + fn pull(&mut self) -> &mut Option> { let result = self.puller.pull(); if let Some(bundle) = result { let channel = self.channel; diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 8efba9318..1fdcf228d 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -3,13 +3,13 @@ use std::rc::Rc; use std::cell::RefCell; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::Message; use crate::progress::ChangeBatch; use crate::communication::Pull; use crate::Container; /// A wrapper which accounts records pulled past in a shared count map. -pub struct Counter>> { +pub struct Counter>> { pullable: P, consumed: Rc>>, phantom: ::std::marker::PhantomData, @@ -36,15 +36,15 @@ impl Drop for ConsumedGuard { } } -impl>> Counter { +impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] - pub fn next(&mut self) -> Option<&mut Bundle> { + pub fn next(&mut self) -> Option<&mut Message> { self.next_guarded().map(|(_guard, bundle)| bundle) } #[inline] - pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Bundle)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut Message)> { if let Some(message) = self.pullable.pull() { let guard = ConsumedGuard { consumed: Rc::clone(&self.consumed), @@ -57,7 +57,7 @@ impl>> Counter } } -impl>> Counter { +impl>> Counter { /// Allocates a new `Counter` from a boxed puller. pub fn new(pullable: P) -> Self { Counter { diff --git a/timely/src/dataflow/channels/pushers/buffer.rs b/timely/src/dataflow/channels/pushers/buffer.rs index 1dd0ededd..48feb14ab 100644 --- a/timely/src/dataflow/channels/pushers/buffer.rs +++ b/timely/src/dataflow/channels/pushers/buffer.rs @@ -3,7 +3,7 @@ use crate::communication::Push; use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto}; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::Message; use crate::dataflow::operators::Capability; use crate::progress::Timestamp; use crate::Container; @@ -44,7 +44,7 @@ impl Buffer { } } -impl>> Buffer, P> where T: Eq+Clone { +impl>> Buffer, P> where T: Eq+Clone { /// Returns a `Session`, which accepts data to send at the associated time #[inline] pub fn session(&mut self, time: &T) -> Session, P> { @@ -66,7 +66,7 @@ impl>> Buffer>> Buffer where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { /// Returns a `Session`, which accepts data to send at the associated time pub fn session_with_builder(&mut self, time: &T) -> Session { if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); } @@ -85,7 +85,7 @@ impl>> Buffer>> Buffer where T: Eq+Clone { +impl>> Buffer where T: Eq+Clone { /// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush. pub fn cease(&mut self) { self.flush(); @@ -115,7 +115,7 @@ impl PushInto for Buffer where T: Eq+Clone, CB: ContainerBuilder + PushInto, - P: Push> + P: Push> { #[inline] fn push_into(&mut self, item: D) { @@ -136,7 +136,7 @@ pub struct Session<'a, T, CB, P> { impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder, P> where T: Eq + Clone + 'a, - P: Push> + 'a, + P: Push> + 'a, { /// Provide a container at the time specified by the [Session]. pub fn give_container(&mut self, container: &mut C) { @@ -148,7 +148,7 @@ impl<'a, T, CB, P> Session<'a, T, CB, P> where T: Eq + Clone + 'a, CB: ContainerBuilder + 'a, - P: Push> + 'a + P: Push> + 'a { /// Access the builder. Immutable access to prevent races with flushing /// the underlying buffer. @@ -179,7 +179,7 @@ impl<'a, T, CB, P, D> PushInto for Session<'a, T, CB, P> where T: Eq + Clone + 'a, CB: ContainerBuilder + PushInto + 'a, - P: Push> + 'a, + P: Push> + 'a, { #[inline] fn push_into(&mut self, item: D) { @@ -192,7 +192,7 @@ pub struct AutoflushSession<'a, T, CB, P> where T: Timestamp + 'a, CB: ContainerBuilder + 'a, - P: Push> + 'a, + P: Push> + 'a, { /// A reference to the underlying buffer. buffer: &'a mut Buffer, @@ -204,7 +204,7 @@ impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P> where T: Timestamp + 'a, CB: ContainerBuilder + 'a, - P: Push> + 'a, + P: Push> + 'a, { /// Transmits a single record. #[inline] @@ -231,7 +231,7 @@ impl<'a, T, CB, P, D> PushInto for AutoflushSession<'a, T, CB, P> where T: Timestamp + 'a, CB: ContainerBuilder + PushInto + 'a, - P: Push> + 'a, + P: Push> + 'a, { #[inline] fn push_into(&mut self, item: D) { @@ -243,7 +243,7 @@ impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P> where T: Timestamp + 'a, CB: ContainerBuilder + 'a, - P: Push> + 'a, + P: Push> + 'a, { fn drop(&mut self) { self.buffer.cease(); diff --git a/timely/src/dataflow/channels/pushers/counter.rs b/timely/src/dataflow/channels/pushers/counter.rs index 11b72946a..a6a16ce5d 100644 --- a/timely/src/dataflow/channels/pushers/counter.rs +++ b/timely/src/dataflow/channels/pushers/counter.rs @@ -5,21 +5,21 @@ use std::rc::Rc; use std::cell::RefCell; use crate::progress::{ChangeBatch, Timestamp}; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::Message; use crate::communication::Push; use crate::Container; /// A wrapper which updates shared `produced` based on the number of records pushed. #[derive(Debug)] -pub struct Counter>> { +pub struct Counter>> { pushee: P, produced: Rc>>, phantom: PhantomData, } -impl Push> for Counter where P: Push> { +impl Push> for Counter where P: Push> { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64); } @@ -31,7 +31,7 @@ impl Push> for Counter wher } } -impl>> Counter where T : Ord+Clone+'static { +impl>> Counter where T : Ord+Clone+'static { /// Allocates a new `Counter` from a pushee and shared counts. pub fn new(pushee: P) -> Counter { Counter { diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 4545933ed..c5e6020f1 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -2,12 +2,12 @@ use crate::communication::Push; use crate::container::PushPartitioned; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::Message; use crate::{Container, Data}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H> +pub struct Exchange>, H> where for<'a> H: FnMut(&C::Item<'a>) -> u64 { @@ -17,7 +17,7 @@ where hash_func: H, } -impl>, H> Exchange +impl>, H> Exchange where for<'a> H: FnMut(&C::Item<'a>) -> u64 { @@ -44,21 +44,21 @@ where } } -impl>, H, > Push> for Exchange +impl>, H, > Push> for Exchange where C: PushPartitioned, for<'a> H: FnMut(&C::Item<'a>) -> u64 { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); } else if let Some(message) = message { - let time = &message.payload.time; - let data = &mut message.payload.data; + let time = &message.time; + let data = &mut message.data; // if the time isn't right, flush everything. if self.current.as_ref().map_or(false, |x| x != time) { diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index f5cbfb226..8b7a4b906 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -4,12 +4,12 @@ use std::cell::RefCell; use std::fmt::{self, Debug}; use std::rc::Rc; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::Message; use crate::communication::Push; use crate::{Container, Data}; -type PushList = Rc>>>>>; +type PushList = Rc>>>>>; /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. pub struct Tee { @@ -17,9 +17,9 @@ pub struct Tee { shared: PushList, } -impl Push> for Tee { +impl Push> for Tee { #[inline] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { let mut pushers = self.shared.borrow_mut(); if let Some(message) = message { for index in 1..pushers.len() { @@ -86,7 +86,7 @@ pub struct TeeHelper { impl TeeHelper { /// Adds a new `Push` implementor to the list of recipients shared with a `Stream`. - pub fn add_pusher>+'static>(&self, pusher: P) { + pub fn add_pusher>+'static>(&self, pusher: P) { self.shared.borrow_mut().push(Box::new(pusher)); } } diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index 2a9ff4f5d..28df65208 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -136,8 +136,8 @@ impl Capture for StreamCore { // turn each received message into an event. while let Some(message) = input.next() { - let time = &message.payload.time; - let data = &mut message.payload.data; + let time = &message.time; + let data = &mut message.data; let vector = std::mem::take(data); event_pusher.push(Event::Messages(time.clone(), vector)); } diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index ce796546f..020e763c7 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -28,7 +28,7 @@ use crate::progress::{Source, Target}; use crate::{Container, Data}; use crate::communication::Push; use crate::dataflow::channels::pushers::{Counter, Tee}; -use crate::dataflow::channels::{Bundle, Message}; +use crate::dataflow::channels::Message; use crate::worker::AsWorker; use crate::dataflow::{StreamCore, Scope}; @@ -137,15 +137,15 @@ struct IngressNub, TContain active: bool, } -impl, TContainer: Container> Push> for IngressNub { - fn push(&mut self, element: &mut Option>) { +impl, TContainer: Container> Push> for IngressNub { + fn push(&mut self, element: &mut Option>) { if let Some(message) = element { - let outer_message = &mut message.payload; + let outer_message = message; let data = ::std::mem::take(&mut outer_message.data); - let mut inner_message = Some(Bundle::from(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); + let mut inner_message = Some(Message::from(Message::new(TInner::to_inner(outer_message.time.clone()), data, 0, 0))); self.targets.push(&mut inner_message); if let Some(inner_message) = inner_message { - outer_message.data = inner_message.payload.data; + outer_message.data = inner_message.data; } self.active = true; } @@ -165,16 +165,16 @@ struct EgressNub, TContaine phantom: PhantomData, } -impl Push> for EgressNub +impl Push> for EgressNub where TOuter: Timestamp, TInner: Timestamp+Refines, TContainer: Data { - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { if let Some(message) = message { - let inner_message = &mut message.payload; + let inner_message = message; let data = ::std::mem::take(&mut inner_message.data); - let mut outer_message = Some(Bundle::from(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); + let mut outer_message = Some(Message::from(Message::new(inner_message.time.clone().to_outer(), data, 0, 0))); self.targets.push(&mut outer_message); if let Some(outer_message) = outer_message { - inner_message.data = outer_message.payload.data; + inner_message.data = outer_message.data; } } else { self.targets.done(); } @@ -207,12 +207,12 @@ impl

LogPusher

{ } } -impl Push> for LogPusher

+impl Push> for LogPusher

where C: Container, - P: Push>, + P: Push>, { - fn push(&mut self, element: &mut Option>) { + fn push(&mut self, element: &mut Option>) { if let Some(bundle) = element { let send_event = MessagesEvent { is_send: true, diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 99902966d..b69485e82 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -113,8 +113,8 @@ impl Probe for StreamCore { } while let Some(message) = input.next() { - let time = &message.payload.time; - let data = &mut message.payload.data; + let time = &message.time; + let data = &mut message.data; output.session(time).give_container(data); } output.cease(); diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 5b2f552d0..586cec88a 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -13,7 +13,7 @@ use crate::progress::frontier::MutableAntichain; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::channels::pushers::buffer::{Buffer, Session}; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::Message; use crate::communication::{Push, Pull}; use crate::Container; use crate::container::{ContainerBuilder, CapacityContainerBuilder}; @@ -23,7 +23,7 @@ use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. -pub struct InputHandleCore>> { +pub struct InputHandleCore>> { pull_counter: PullCounter, internal: Rc>>>>>, /// Timestamp summaries from this input to each output. @@ -38,7 +38,7 @@ pub struct InputHandleCore>> { pub type InputHandle = InputHandleCore, P>; /// Handle to an operator's input stream and frontier. -pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull>+'a> { +pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull>+'a> { /// The underlying input handle. pub handle: &'a mut InputHandleCore, /// The frontier as reported by timely progress tracking. @@ -48,7 +48,7 @@ pub struct FrontieredInputHandleCore<'a, T: Timestamp, C: Container+'a, P: Pull< /// Handle to an operator's input stream and frontier, specialized to vectors. pub type FrontieredInputHandle<'a, T, D, P> = FrontieredInputHandleCore<'a, T, Vec, P>; -impl<'a, T: Timestamp, C: Container, P: Pull>> InputHandleCore { +impl<'a, T: Timestamp, C: Container, P: Pull>> InputHandleCore { /// Reads the next input buffer (at some timestamp `t`) and a corresponding capability for `t`. /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. @@ -58,7 +58,7 @@ impl<'a, T: Timestamp, C: Container, P: Pull>> InputHandleCore>> InputHandleCore>+'a> FrontieredInputHandleCore<'a, T, C, P> { +impl<'a, T: Timestamp, C: Container, P: Pull>+'a> FrontieredInputHandleCore<'a, T, C, P> { /// Allocate a new frontiered input handle. pub fn new(handle: &'a mut InputHandleCore, frontier: &'a MutableAntichain) -> Self { FrontieredInputHandleCore { @@ -140,13 +140,13 @@ impl<'a, T: Timestamp, C: Container, P: Pull>+'a> FrontieredInputHa } } -pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { +pub fn _access_pull_counter>>(input: &mut InputHandleCore) -> &mut PullCounter { &mut input.pull_counter } /// Constructs an input handle. /// Declared separately so that it can be kept private when `InputHandle` is re-exported. -pub fn new_input_handle>>( +pub fn new_input_handle>>( pull_counter: PullCounter, internal: Rc>>>>>, summaries: Rc>>>, @@ -166,12 +166,12 @@ pub fn new_input_handle>>( /// than with an `OutputHandle`, whose methods ensure that capabilities are used and that the /// pusher is flushed (via the `cease` method) once it is no longer used. #[derive(Debug)] -pub struct OutputWrapper>> { +pub struct OutputWrapper>> { push_buffer: Buffer>, internal_buffer: Rc>>, } -impl>> OutputWrapper { +impl>> OutputWrapper { /// Creates a new output wrapper from a push buffer. pub fn new(push_buffer: Buffer>, internal_buffer: Rc>>) -> Self { OutputWrapper { @@ -192,7 +192,7 @@ impl>> Outp } /// Handle to an operator's output stream. -pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push>+'a> { +pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push>+'a> { push_buffer: &'a mut Buffer>, internal_buffer: &'a Rc>>, } @@ -200,7 +200,7 @@ pub struct OutputHandleCore<'a, T: Timestamp, CB: ContainerBuilder+'a, P: Push = OutputHandleCore<'a, T, CapacityContainerBuilder>, P>; -impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> OutputHandleCore<'a, T, CB, P> { +impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> OutputHandleCore<'a, T, CB, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -235,7 +235,7 @@ impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> } } -impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { +impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, T, CapacityContainerBuilder, P> { /// Obtains a session that can send data at the timestamp associated with capability `cap`. /// /// In order to send data at a future timestamp, obtain a capability for the new timestamp @@ -264,7 +264,7 @@ impl<'a, T: Timestamp, C: Container, P: Push>> OutputHandleCore<'a, } } -impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> Drop for OutputHandleCore<'a, T, CB, P> { +impl<'a, T: Timestamp, CB: ContainerBuilder, P: Push>> Drop for OutputHandleCore<'a, T, CB, P> { fn drop(&mut self) { self.push_buffer.cease(); } diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 653976377..92e0f19a1 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -9,7 +9,7 @@ use crate::progress::{Source, Target}; use crate::communication::Push; use crate::dataflow::Scope; use crate::dataflow::channels::pushers::tee::TeeHelper; -use crate::dataflow::channels::Bundle; +use crate::dataflow::channels::Message; use std::fmt::{self, Debug}; use crate::Container; @@ -25,7 +25,7 @@ pub struct StreamCore { name: Source, /// The `Scope` containing the stream. scope: S, - /// Maintains a list of Push> interested in the stream's output. + /// Maintains a list of Push> interested in the stream's output. ports: TeeHelper, } @@ -37,7 +37,7 @@ impl StreamCore { /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the /// records should actually be sent. The identifier is unique to the edge and is used only for logging purposes. - pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { + pub fn connect_to>+'static>(&self, target: Target, pusher: P, identifier: usize) { let mut logging = self.scope().logging(); logging.as_mut().map(|l| l.log(crate::logging::ChannelsEvent {