From a7ad00e04e04814bca086c007749b780854c583c Mon Sep 17 00:00:00 2001 From: Chase Wilson Date: Sun, 2 May 2021 16:25:32 -0500 Subject: [PATCH] Minor clippy cleanup --- timely/src/dataflow/channels/pact.rs | 52 +++++++++++-------- timely/src/dataflow/channels/pushers/tee.rs | 6 ++- .../src/dataflow/operators/capture/event.rs | 6 +++ 3 files changed, 40 insertions(+), 24 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 719e00344d..50356eb620 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -16,7 +16,7 @@ use crate::worker::AsWorker; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; use super::{Bundle, Message}; -use crate::logging::TimelyLogger as Logger; +use crate::logging::{TimelyLogger as Logger, MessagesEvent}; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { @@ -87,7 +87,7 @@ pub struct LogPusher>> { counter: usize, source: usize, target: usize, - phantom: ::std::marker::PhantomData<(T, D)>, + phantom: PhantomData<(T, D)>, logging: Option, } @@ -100,7 +100,7 @@ impl>> LogPusher { counter: 0, source, target, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, logging, } } @@ -111,22 +111,26 @@ impl>> Push> for LogPusher { fn push(&mut self, pair: &mut Option>) { if let Some(bundle) = pair { self.counter += 1; + // Stamp the sequence number and source. // FIXME: Awkward moment/logic. if let Some(message) = bundle.if_mut() { - message.seq = self.counter-1; + message.seq = self.counter - 1; message.from = self.source; } - self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent { - is_send: true, - channel: self.channel, - source: self.source, - target: self.target, - seq_no: self.counter-1, - length: bundle.data.len(), - })); + if let Some(logger) = self.logging.as_ref() { + logger.log(MessagesEvent { + is_send: true, + channel: self.channel, + source: self.source, + target: self.target, + seq_no: self.counter - 1, + length: bundle.data.len(), + }) + } } + self.pusher.push(pair); } } @@ -137,7 +141,7 @@ pub struct LogPuller>> { puller: P, channel: usize, index: usize, - phantom: ::std::marker::PhantomData<(T, D)>, + phantom: PhantomData<(T, D)>, logging: Option, } @@ -148,7 +152,7 @@ impl>> LogPuller { puller, channel, index, - phantom: ::std::marker::PhantomData, + phantom: PhantomData, logging, } } @@ -161,15 +165,19 @@ impl>> Pull> for LogPuller { if let Some(bundle) = result { let channel = self.channel; let target = self.index; - self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent { - is_send: false, - channel, - source: bundle.from, - target, - seq_no: bundle.seq, - length: bundle.data.len(), - })); + + if let Some(logger) = self.logging.as_ref() { + logger.log(MessagesEvent { + is_send: false, + channel, + source: bundle.from, + target, + seq_no: bundle.seq, + length: bundle.data.len(), + }); + } } + result } } diff --git a/timely/src/dataflow/channels/pushers/tee.rs b/timely/src/dataflow/channels/pushers/tee.rs index 274739a045..0ddd98beb6 100644 --- a/timely/src/dataflow/channels/pushers/tee.rs +++ b/timely/src/dataflow/channels/pushers/tee.rs @@ -9,10 +9,12 @@ use crate::Data; use crate::communication::Push; +type PushList = Rc>>>>>; + /// Wraps a shared list of `Box` to forward pushes to. Owned by `Stream`. pub struct Tee { buffer: Vec, - shared: Rc>>>>>, + shared: PushList, } impl Push> for Tee { @@ -82,7 +84,7 @@ where /// A shared list of `Box` used to add `Push` implementors. pub struct TeeHelper { - shared: Rc>>>>>, + shared: PushList, } impl TeeHelper { diff --git a/timely/src/dataflow/operators/capture/event.rs b/timely/src/dataflow/operators/capture/event.rs index d160a70406..31e3d6b0d8 100644 --- a/timely/src/dataflow/operators/capture/event.rs +++ b/timely/src/dataflow/operators/capture/event.rs @@ -101,6 +101,12 @@ pub mod link { } } + impl Default for EventLink { + fn default() -> Self { + Self::new() + } + } + #[test] fn avoid_stack_overflow_in_drop() { let mut event1 = Rc::new(EventLink::<(),()>::new());