diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 6ef17ab31..976023a19 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -73,14 +73,13 @@ implu64+'static> ParallelizationC where C: Data + Container + PushPartitioned, { - // TODO: The closure in the type prevents us from naming it. - // Could specialize `ExchangePusher` to a time-free version. - type Pusher = Box>>; - type Puller = Box>>; - fn connect(mut self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + type Pusher = ExchangePusher>>>, F>; + type Puller = LogPuller>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (Box::new(ExchangePusher::new(senders, move |_, d| (self.hash_func)(d))), Box::new(LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))) + (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 6ddca7332..9ea271d31 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -7,7 +7,7 @@ use crate::dataflow::channels::{BundleCore, Message}; // TODO : Software write combining /// Distributes records among target pushees according to a distribution function. -pub struct Exchange>, H: FnMut(&T, &D) -> u64> { +pub struct Exchange>, H: FnMut(&D) -> u64> { pushers: Vec

, buffers: Vec, current: Option, @@ -15,7 +15,7 @@ pub struct Exchange>, H: FnMut(&T, phantom: std::marker::PhantomData, } -impl>, H: FnMut(&T, &D)->u64> Exchange { +impl>, H: FnMut(&D) -> u64> Exchange { /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. pub fn new(pushers: Vec

, key: H) -> Exchange { let mut buffers = vec![]; @@ -40,7 +40,7 @@ impl>, H: FnMut(&T, &D } } -impl>, H: FnMut(&T, &D)->u64> Push> for Exchange +impl>, H: FnMut(&D) -> u64> Push> for Exchange where C: PushPartitioned { @@ -72,7 +72,7 @@ where let pushers = &mut self.pushers; data.push_partitioned( &mut self.buffers, - move |datum| ((hash_func)(time, datum) & mask) as usize, + move |datum| ((hash_func)(datum) & mask) as usize, |index, buffer| { Message::push_at(buffer, time.clone(), &mut pushers[index]); } @@ -84,7 +84,7 @@ where let pushers = &mut self.pushers; data.push_partitioned( &mut self.buffers, - move |datum| ((hash_func)(time, datum) % num_pushers) as usize, + move |datum| ((hash_func)(datum) % num_pushers) as usize, |index, buffer| { Message::push_at(buffer, time.clone(), &mut pushers[index]); }