diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 66a06c6ece..8c4393f727 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -14,14 +14,36 @@ pub struct Counter>> { phantom: ::std::marker::PhantomData, } +/// A guard type that updates the change batch counts on drop +pub struct ConsumedGuard<'a, T: Ord + Clone + 'static> { + consumed: &'a Rc>>, + time: Option, + len: usize, +} + +impl<'a, T:Ord+Clone+'static> Drop for ConsumedGuard<'a, T> { + fn drop(&mut self) { + self.consumed.borrow_mut().update(self.time.take().unwrap(), self.len as i64); + } +} + impl>> Counter { /// Retrieves the next timestamp and batch of data. #[inline] pub fn next(&mut self) -> Option<&mut Bundle> { + self.next_guarded().map(|(_guard, bundle)| bundle) + } + + #[inline] + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<'_, T>, &mut Bundle)> { if let Some(message) = self.pullable.pull() { if message.data.len() > 0 { - self.consumed.borrow_mut().update(message.time.clone(), message.data.len() as i64); - Some(message) + let guard = ConsumedGuard { + consumed: &self.consumed, + time: Some(message.time.clone()), + len: message.data.len(), + }; + Some((guard, message)) } else { None } } diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 82009a95a0..9fbf27e90b 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -30,6 +30,7 @@ use crate::order::PartialOrder; use crate::progress::Timestamp; use crate::progress::ChangeBatch; use crate::scheduling::Activations; +use crate::dataflow::channels::pullers::counter::ConsumedGuard; /// An internal trait expressing the capability to send messages with a given timestamp. pub trait CapabilityTrait { @@ -231,6 +232,8 @@ type CapabilityUpdates = Rc>>>>>; pub struct CapabilityRef<'cap, T: Timestamp+'cap> { time: &'cap T, internal: CapabilityUpdates, + /// A drop guard that updates the consumed capability this CapabilityRef refers to on drop + _consumed_guard: ConsumedGuard<'cap, T>, } impl<'cap, T: Timestamp+'cap> CapabilityTrait for CapabilityRef<'cap, T> { @@ -244,10 +247,11 @@ impl<'cap, T: Timestamp+'cap> CapabilityTrait for CapabilityRef<'cap, T> { impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) /// the provided [`ChangeBatch`]. - pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates) -> Self { + pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates, guard: ConsumedGuard<'cap, T>) -> Self { CapabilityRef { time, internal, + _consumed_guard: guard, } } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index b9d69f634d..45b38f66d2 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -43,13 +43,13 @@ impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { #[inline] pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut>)> { let internal = &self.internal; - self.pull_counter.next().map(|bundle| { + self.pull_counter.next_guarded().map(|(guard, bundle)| { match bundle.as_ref_or_mut() { RefOrMut::Ref(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data)) + (CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Ref(&bundle.data)) }, RefOrMut::Mut(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data)) + (CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Mut(&mut bundle.data)) }, } }) @@ -75,22 +75,13 @@ impl<'a, T: Timestamp, D: Data, P: Pull>> InputHandle { /// ``` #[inline] pub fn for_each, RefOrMut>)>(&mut self, mut logic: F) { - // We inline `next()` so that we can use `self.logging` without cloning (and dropping) the logger. - let internal = &self.internal; - while let Some((cap, data)) = self.pull_counter.next().map(|bundle| { - match bundle.as_ref_or_mut() { - RefOrMut::Ref(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Ref(&bundle.data)) - }, - RefOrMut::Mut(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone()), RefOrMut::Mut(&mut bundle.data)) - }, - } - }) { - self.logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); + let mut logging = self.logging.take(); + while let Some((cap, data)) = self.next() { + logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); logic(cap, data); - self.logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false })); + logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: false })); } + self.logging = logging; } }