diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 7fd4dca12..625de6cd6 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -32,54 +32,54 @@ impl Generic { /// The index of the worker out of `(0..self.peers())`. pub fn index(&self) -> usize { match self { - &Generic::Thread(ref t) => t.index(), - &Generic::Process(ref p) => p.index(), - &Generic::ProcessBinary(ref pb) => pb.index(), - &Generic::ZeroCopy(ref z) => z.index(), + Generic::Thread(t) => t.index(), + Generic::Process(p) => p.index(), + Generic::ProcessBinary(pb) => pb.index(), + Generic::ZeroCopy(z) => z.index(), } } /// The number of workers. pub fn peers(&self) -> usize { match self { - &Generic::Thread(ref t) => t.peers(), - &Generic::Process(ref p) => p.peers(), - &Generic::ProcessBinary(ref pb) => pb.peers(), - &Generic::ZeroCopy(ref z) => z.peers(), + Generic::Thread(t) => t.peers(), + Generic::Process(p) => p.peers(), + Generic::ProcessBinary(pb) => pb.peers(), + Generic::ZeroCopy(z) => z.peers(), } } /// Constructs several send endpoints and one receive endpoint. fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) { match self { - &mut Generic::Thread(ref mut t) => t.allocate(identifier), - &mut Generic::Process(ref mut p) => p.allocate(identifier), - &mut Generic::ProcessBinary(ref mut pb) => pb.allocate(identifier), - &mut Generic::ZeroCopy(ref mut z) => z.allocate(identifier), + Generic::Thread(t) => t.allocate(identifier), + Generic::Process(p) => p.allocate(identifier), + Generic::ProcessBinary(pb) => pb.allocate(identifier), + Generic::ZeroCopy(z) => z.allocate(identifier), } } /// Perform work before scheduling operators. fn receive(&mut self) { match self { - &mut Generic::Thread(ref mut t) => t.receive(), - &mut Generic::Process(ref mut p) => p.receive(), - &mut Generic::ProcessBinary(ref mut pb) => pb.receive(), - &mut Generic::ZeroCopy(ref mut z) => z.receive(), + Generic::Thread(t) => t.receive(), + Generic::Process(p) => p.receive(), + Generic::ProcessBinary(pb) => pb.receive(), + Generic::ZeroCopy(z) => z.receive(), } } /// Perform work after scheduling operators. pub fn release(&mut self) { match self { - &mut Generic::Thread(ref mut t) => t.release(), - &mut Generic::Process(ref mut p) => p.release(), - &mut Generic::ProcessBinary(ref mut pb) => pb.release(), - &mut Generic::ZeroCopy(ref mut z) => z.release(), + Generic::Thread(t) => t.release(), + Generic::Process(p) => p.release(), + Generic::ProcessBinary(pb) => pb.release(), + Generic::ZeroCopy(z) => z.release(), } } fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { match self { - &Generic::Thread(ref t) => t.events(), - &Generic::Process(ref p) => p.events(), - &Generic::ProcessBinary(ref pb) => pb.events(), - &Generic::ZeroCopy(ref z) => z.events(), + Generic::Thread(ref t) => t.events(), + Generic::Process(ref p) => p.events(), + Generic::ProcessBinary(ref pb) => pb.events(), + Generic::ZeroCopy(ref z) => z.events(), } } } @@ -96,10 +96,10 @@ impl Allocate for Generic { fn events(&self) -> &Rc<RefCell<VecDeque<(usize, Event)>>> { self.events() } fn await_events(&self, _duration: Option<std::time::Duration>) { match self { - &Generic::Thread(ref t) => t.await_events(_duration), - &Generic::Process(ref p) => p.await_events(_duration), - &Generic::ProcessBinary(ref pb) => pb.await_events(_duration), - &Generic::ZeroCopy(ref z) => z.await_events(_duration), + Generic::Thread(t) => t.await_events(_duration), + Generic::Process(p) => p.await_events(_duration), + Generic::ProcessBinary(pb) => pb.await_events(_duration), + Generic::ZeroCopy(z) => z.await_events(_duration), } } } diff --git a/communication/src/allocator/process.rs b/communication/src/allocator/process.rs index 8894b730f..132bc6de5 100644 --- a/communication/src/allocator/process.rs +++ b/communication/src/allocator/process.rs @@ -38,7 +38,7 @@ impl AllocateBuilder for ProcessBuilder { let buzzer = Buzzer::new(); worker.send(buzzer).expect("Failed to send buzzer"); } - let mut buzzers = Vec::new(); + let mut buzzers = Vec::with_capacity(self.buzzers_recv.len()); for worker in self.buzzers_recv.iter() { buzzers.push(worker.recv().expect("Failed to recv buzzer")); } @@ -69,19 +69,19 @@ pub struct Process { impl Process { /// Access the wrapped inner allocator. - pub fn inner<'a>(&'a mut self) -> &'a mut Thread { &mut self.inner } + pub fn inner(&mut self) -> &mut Thread { &mut self.inner } /// Allocate a list of connected intra-process allocators. pub fn new_vector(peers: usize) -> Vec<ProcessBuilder> { - let mut counters_send = Vec::new(); - let mut counters_recv = Vec::new(); + let mut counters_send = Vec::with_capacity(peers); + let mut counters_recv = Vec::with_capacity(peers); for _ in 0 .. peers { let (send, recv) = crossbeam_channel::unbounded(); counters_send.push(send); counters_recv.push(recv); } - let channels = Arc::new(Mutex::new(HashMap::new())); + let channels = Arc::new(Mutex::new(HashMap::with_capacity(peers))); // Allocate matrix of buzzer send and recv endpoints. let (buzzers_send, buzzers_recv) = crate::promise_futures(peers, peers); @@ -116,23 +116,23 @@ impl Allocate for Process { // first worker that enters this critical section // ensure exclusive access to shared list of channels - let mut channels = self.channels.lock().ok().expect("mutex error?"); + let mut channels = self.channels.lock().expect("mutex error?"); let (sends, recv, empty) = { // we may need to alloc a new channel ... let entry = channels.entry(identifier).or_insert_with(|| { - let mut pushers = Vec::new(); - let mut pullers = Vec::new(); - for index in 0 .. self.peers { + let mut pushers = Vec::with_capacity(self.peers); + let mut pullers = Vec::with_capacity(self.peers); + for buzzer in self.buzzers.iter() { let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = crossbeam_channel::unbounded(); // TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter. - pushers.push((Pusher { target: s }, self.buzzers[index].clone())); + pushers.push((Pusher { target: s }, buzzer.clone())); pullers.push(Puller { source: r, current: None }); } - let mut to_box = Vec::new(); + let mut to_box = Vec::with_capacity(pullers.len()); for recv in pullers.into_iter() { to_box.push(Some((pushers.clone(), recv))); } @@ -164,8 +164,8 @@ impl Allocate for Process { let sends = sends.into_iter() - .enumerate() - .map(|(i,(s,b))| CountPusher::new(s, identifier, self.counters_send[i].clone(), b)) + .zip(self.counters_send.iter()) + .map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b)) .map(|s| Box::new(s) as Box<dyn Push<super::Message<T>>>) .collect::<Vec<_>>(); diff --git a/communication/src/allocator/thread.rs b/communication/src/allocator/thread.rs index de2f207bb..ba5407e4d 100644 --- a/communication/src/allocator/thread.rs +++ b/communication/src/allocator/thread.rs @@ -68,7 +68,7 @@ impl Thread { let pusher = Pusher { target: shared.clone() }; let pusher = CountPusher::new(pusher, identifier, events.clone()); let puller = Puller { source: shared, current: None }; - let puller = CountPuller::new(puller, identifier, events.clone()); + let puller = CountPuller::new(puller, identifier, events); (pusher, puller) } } diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index 2722e6a79..dd2815a50 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -127,7 +127,7 @@ impl Allocate for ProcessAllocator { } self.channel_id_bound = Some(identifier); - let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new(); + let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers()); for target_index in 0 .. self.peers() { diff --git a/communication/src/allocator/zero_copy/initialize.rs b/communication/src/allocator/zero_copy/initialize.rs index 4abaa622b..97ade2c78 100644 --- a/communication/src/allocator/zero_copy/initialize.rs +++ b/communication/src/allocator/zero_copy/initialize.rs @@ -75,58 +75,52 @@ pub fn initialize_networking_from_sockets( let mut promises_iter = promises.into_iter(); let mut futures_iter = futures.into_iter(); - let mut send_guards = Vec::new(); - let mut recv_guards = Vec::new(); + let mut send_guards = Vec::with_capacity(sockets.len()); + let mut recv_guards = Vec::with_capacity(sockets.len()); // for each process, if a stream exists (i.e. not local) ... - for index in 0..sockets.len() { - - if let Some(stream) = sockets[index].take() { - // remote process - - let remote_recv = promises_iter.next().unwrap(); - - { - let log_sender = log_sender.clone(); - let stream = stream.try_clone()?; - let join_guard = - ::std::thread::Builder::new() - .name(format!("timely:send-{}", index)) - .spawn(move || { - - let logger = log_sender(CommunicationSetup { - process: my_index, - sender: true, - remote: Some(index), - }); - - send_loop(stream, remote_recv, my_index, index, logger); - })?; - - send_guards.push(join_guard); - } - - let remote_send = futures_iter.next().unwrap(); - - { - // let remote_sends = remote_sends.clone(); - let log_sender = log_sender.clone(); - let stream = stream.try_clone()?; - let join_guard = - ::std::thread::Builder::new() - .name(format!("timely:recv-{}", index)) - .spawn(move || { - let logger = log_sender(CommunicationSetup { - process: my_index, - sender: false, - remote: Some(index), - }); - recv_loop(stream, remote_send, threads * my_index, my_index, index, logger); - })?; - - recv_guards.push(join_guard); - } + for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) { + let remote_recv = promises_iter.next().unwrap(); + + { + let log_sender = log_sender.clone(); + let stream = stream.try_clone()?; + let join_guard = + ::std::thread::Builder::new() + .name(format!("timely:send-{}", index)) + .spawn(move || { + + let logger = log_sender(CommunicationSetup { + process: my_index, + sender: true, + remote: Some(index), + }); + + send_loop(stream, remote_recv, my_index, index, logger); + })?; + + send_guards.push(join_guard); + } + let remote_send = futures_iter.next().unwrap(); + + { + // let remote_sends = remote_sends.clone(); + let log_sender = log_sender.clone(); + let stream = stream.try_clone()?; + let join_guard = + ::std::thread::Builder::new() + .name(format!("timely:recv-{}", index)) + .spawn(move || { + let logger = log_sender(CommunicationSetup { + process: my_index, + sender: false, + remote: Some(index), + }); + recv_loop(stream, remote_send, threads * my_index, my_index, index, logger); + })?; + + recv_guards.push(join_guard); } } diff --git a/communication/src/allocator/zero_copy/push_pull.rs b/communication/src/allocator/zero_copy/push_pull.rs index da8c287ac..fe423c21a 100644 --- a/communication/src/allocator/zero_copy/push_pull.rs +++ b/communication/src/allocator/zero_copy/push_pull.rs @@ -28,8 +28,8 @@ impl<T, P: BytesPush> Pusher<T, P> { /// Creates a new `Pusher` from a header and shared byte buffer. pub fn new(header: MessageHeader, sender: Rc<RefCell<SendEndpoint<P>>>) -> Pusher<T, P> { Pusher { - header: header, - sender: sender, + header, + sender, phantom: ::std::marker::PhantomData, } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 1d9a455e5..5f15878ba 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -136,7 +136,7 @@ impl<G: Scope> OperatorBuilder<G> { L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static { // create capabilities, discard references to their creation. - let mut capabilities = Vec::new(); + let mut capabilities = Vec::with_capacity(self.internal.borrow().len()); for output_index in 0 .. self.internal.borrow().len() { let borrow = &self.internal.borrow()[output_index]; capabilities.push(mint_capability(G::Timestamp::minimum(), borrow.clone())); diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/partition.rs index 491517a4b..cc5097530 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/partition.rs @@ -6,7 +6,7 @@ use crate::dataflow::{Scope, Stream}; use crate::Data; /// Partition a stream of records into multiple streams. -pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> { +pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2)> { /// Produces `parts` output streams, containing records produced and assigned by `route`. /// /// # Examples @@ -27,12 +27,11 @@ pub trait Partition<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)> { impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D2, F> for Stream<G, D> { fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>> { - let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); let mut input = builder.new_input(self, Pipeline); - let mut outputs = Vec::new(); - let mut streams = Vec::new(); + let mut outputs = Vec::with_capacity(parts as usize); + let mut streams = Vec::with_capacity(parts as usize); for _ in 0 .. parts { let (output, stream) = builder.new_output(); @@ -47,6 +46,7 @@ impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D input.for_each(|time, data| { data.swap(&mut vector); let mut sessions = handles.iter_mut().map(|h| h.session(&time)).collect::<Vec<_>>(); + for datum in vector.drain(..) { let (part, datum2) = route(datum); sessions[part as usize].give(datum2); @@ -57,4 +57,4 @@ impl<G: Scope, D: Data, D2: Data, F: Fn(D)->(u64, D2)+'static> Partition<G, D, D streams } -} \ No newline at end of file +} diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index d9d301f38..3eaf4ff93 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -298,7 +298,7 @@ impl<T: Timestamp> Builder<T> { // Initially this list contains observed locations with no incoming // edges, but as the algorithm develops we add to it any locations // that can only be reached by nodes that have been on this list. - let mut worklist = Vec::new(); + let mut worklist = Vec::with_capacity(in_degree.len()); for (key, val) in in_degree.iter() { if *val == 0 { worklist.push(*key); diff --git a/timely/src/synchronization/sequence.rs b/timely/src/synchronization/sequence.rs index be006de06..7525d3389 100644 --- a/timely/src/synchronization/sequence.rs +++ b/timely/src/synchronization/sequence.rs @@ -179,6 +179,8 @@ impl<T: ExchangeData> Sequencer<T> { // grab each command and queue it up input.for_each(|time, data| { data.swap(&mut vector); + + recvd.reserve(vector.len()); for (worker, counter, element) in vector.drain(..) { recvd.push(((time.time().clone(), worker, counter), element)); }