Skip to content

Commit

Permalink
Minor clippy cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Kixiron committed May 2, 2021
1 parent 3c85498 commit a7ad00e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 24 deletions.
52 changes: 30 additions & 22 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::worker::AsWorker;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use super::{Bundle, Message};

use crate::logging::TimelyLogger as Logger;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T: 'static, D: 'static> {
Expand Down Expand Up @@ -87,7 +87,7 @@ pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
counter: usize,
source: usize,
target: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

Expand All @@ -100,7 +100,7 @@ impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
counter: 0,
source,
target,
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
logging,
}
}
Expand All @@ -111,22 +111,26 @@ impl<T, D, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;

// Stamp the sequence number and source.
// FIXME: Awkward moment/logic.
if let Some(message) = bundle.if_mut() {
message.seq = self.counter-1;
message.seq = self.counter - 1;
message.from = self.source;
}

self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter-1,
length: bundle.data.len(),
}));
if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: true,
channel: self.channel,
source: self.source,
target: self.target,
seq_no: self.counter - 1,
length: bundle.data.len(),
})
}
}

self.pusher.push(pair);
}
}
Expand All @@ -137,7 +141,7 @@ pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: ::std::marker::PhantomData<(T, D)>,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

Expand All @@ -148,7 +152,7 @@ impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
puller,
channel,
index,
phantom: ::std::marker::PhantomData,
phantom: PhantomData,
logging,
}
}
Expand All @@ -161,15 +165,19 @@ impl<T, D, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
if let Some(bundle) = result {
let channel = self.channel;
let target = self.index;
self.logging.as_ref().map(|l| l.log(crate::logging::MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
}));

if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
is_send: false,
channel,
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
});
}
}

result
}
}
6 changes: 4 additions & 2 deletions timely/src/dataflow/channels/pushers/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use crate::Data;

use crate::communication::Push;

type PushList<T, D> = Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>;

/// Wraps a shared list of `Box<Push>` to forward pushes to. Owned by `Stream`.
pub struct Tee<T: 'static, D: 'static> {
buffer: Vec<D>,
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>,
shared: PushList<T, D>,
}

impl<T: Data, D: Data> Push<Bundle<T, D>> for Tee<T, D> {
Expand Down Expand Up @@ -82,7 +84,7 @@ where

/// A shared list of `Box<Push>` used to add `Push` implementors.
pub struct TeeHelper<T, D> {
shared: Rc<RefCell<Vec<Box<dyn Push<Bundle<T, D>>>>>>,
shared: PushList<T, D>,
}

impl<T, D> TeeHelper<T, D> {
Expand Down
6 changes: 6 additions & 0 deletions timely/src/dataflow/operators/capture/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ pub mod link {
}
}

impl<T, D> Default for EventLink<T, D> {
fn default() -> Self {
Self::new()
}
}

#[test]
fn avoid_stack_overflow_in_drop() {
let mut event1 = Rc::new(EventLink::<(),()>::new());
Expand Down

0 comments on commit a7ad00e

Please sign in to comment.