From 6a736009d0276b181f0a069174390b99c8bd9ae1 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Sun, 29 Jan 2023 18:23:40 +0100 Subject: [PATCH] timely: unconstrained lifetime for `CapabilityRef` (#491) Since the merge of https://github.com/TimelyDataflow/timely-dataflow/pull/429, `CapabilityRef`s have been made safe to hold onto across operator invocations because that PR made sure that they only decremented their progress counts on `Drop`. While this allowed `async`/`await` based operators to freely hold on to them, it was still very difficult for synchronous based operators to do the same thing, due to the lifetime attached to the `CapabilityRef`. We can observe that the lifetime no longer provides any benefits, which means it can be removed and turn `CapabilityRef`s into fully owned values. This allows any style of operator to easily hold on to them. The benefit of that isn't just performance (by avoiding the `retain()` dance), but also about deferring the decision of the output port a given input should flow to to a later time. After making this change, the name `CapabilityRef` felt wrong, since there is no reference to anything anymore. Instead, the main distinction between `CapabilityRef`s and `Capabilities` are that the former is associated with an input port and the latter is associated with an output port. As such, I have renamed `CapabilityRef` to `InputCapability` to signal to users that holding onto one of them represents holding onto a timestamp at the input for which we have not yet determined the output port that it should flow to. This nicely ties up the semantics of the `InputCapability::retain_for_output` and `InputCapability::delayed_for_output` methods, which make it clear by their name and signature that this is what "transfers" the capability from input ports to output ports. Signed-off-by: Petros Angelatos --- .../src/dataflow/channels/pullers/counter.rs | 20 ++++++--- timely/src/dataflow/operators/capability.rs | 44 +++++++++---------- .../src/dataflow/operators/generic/handles.rs | 14 +++--- timely/src/dataflow/operators/mod.rs | 2 +- 4 files changed, 44 insertions(+), 36 deletions(-) diff --git a/timely/src/dataflow/channels/pullers/counter.rs b/timely/src/dataflow/channels/pullers/counter.rs index 8bc8606e7..50f0c546a 100644 --- a/timely/src/dataflow/channels/pullers/counter.rs +++ b/timely/src/dataflow/channels/pullers/counter.rs @@ -16,15 +16,23 @@ pub struct Counter>> { } /// A guard type that updates the change batch counts on drop -pub struct ConsumedGuard<'a, T: Ord + Clone + 'static> { - consumed: &'a Rc>>, +pub struct ConsumedGuard { + consumed: Rc>>, time: Option, len: usize, } -impl<'a, T:Ord+Clone+'static> Drop for ConsumedGuard<'a, T> { +impl ConsumedGuard { + pub(crate) fn time(&self) -> &T { + &self.time.as_ref().unwrap() + } +} + +impl Drop for ConsumedGuard { fn drop(&mut self) { - self.consumed.borrow_mut().update(self.time.take().unwrap(), self.len as i64); + // SAFETY: we're in a Drop impl, so this runs at most once + let time = self.time.take().unwrap(); + self.consumed.borrow_mut().update(time, self.len as i64); } } @@ -36,11 +44,11 @@ impl>> Counter Option<(ConsumedGuard<'_, T>, &mut BundleCore)> { + pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard, &mut BundleCore)> { if let Some(message) = self.pullable.pull() { if message.data.len() > 0 { let guard = ConsumedGuard { - consumed: &self.consumed, + consumed: Rc::clone(&self.consumed), time: Some(message.time.clone()), len: message.data.len(), }; diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index a63d83374..8a7b9f28b 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -225,39 +225,39 @@ impl Error for DowngradeError {} type CapabilityUpdates = Rc>>>>>; -/// An unowned capability, which can be used but not retained. +/// An capability of an input port. Holding onto this capability will implicitly holds onto a +/// capability for all the outputs ports this input is connected to, after the connection summaries +/// have been applied. /// -/// The capability reference supplies a `retain(self)` method which consumes the reference -/// and turns it into an owned capability -pub struct CapabilityRef<'cap, T: Timestamp+'cap> { - time: &'cap T, +/// This input capability supplies a `retain_for_output(self)` method which consumes the input +/// capability and turns it into a [Capability] for a specific output port. +pub struct InputCapability { internal: CapabilityUpdates, - /// A drop guard that updates the consumed capability this CapabilityRef refers to on drop - _consumed_guard: ConsumedGuard<'cap, T>, + /// A drop guard that updates the consumed capability this InputCapability refers to on drop + consumed_guard: ConsumedGuard, } -impl<'cap, T: Timestamp+'cap> CapabilityTrait for CapabilityRef<'cap, T> { - fn time(&self) -> &T { self.time } +impl CapabilityTrait for InputCapability { + fn time(&self) -> &T { self.time() } fn valid_for_output(&self, query_buffer: &Rc>>) -> bool { // let borrow = ; self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer)) } } -impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { +impl InputCapability { /// Creates a new capability reference at `time` while incrementing (and keeping a reference to) /// the provided [`ChangeBatch`]. - pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates, guard: ConsumedGuard<'cap, T>) -> Self { - CapabilityRef { - time, + pub(crate) fn new(internal: CapabilityUpdates, guard: ConsumedGuard) -> Self { + InputCapability { internal, - _consumed_guard: guard, + consumed_guard: guard, } } /// The timestamp associated with this capability. pub fn time(&self) -> &T { - self.time + self.consumed_guard.time() } /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of @@ -271,7 +271,7 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { /// Delays capability for a specific output port. pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability { // TODO : Test operator summary? - if !self.time.less_equal(new_time) { + if !self.time().less_equal(new_time) { panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time); } if output_port < self.internal.borrow().len() { @@ -295,7 +295,7 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { /// Transforms to an owned capability for a specific output port. pub fn retain_for_output(self, output_port: usize) -> Capability { if output_port < self.internal.borrow().len() { - Capability::new(self.time.clone(), self.internal.borrow()[output_port].clone()) + Capability::new(self.time().clone(), self.internal.borrow()[output_port].clone()) } else { panic!("Attempted to acquire a capability for a non-existent output port."); @@ -303,18 +303,18 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> { } } -impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> { +impl Deref for InputCapability { type Target = T; fn deref(&self) -> &T { - self.time + self.time() } } -impl<'cap, T: Timestamp> Debug for CapabilityRef<'cap, T> { +impl Debug for InputCapability { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("CapabilityRef") - .field("time", &self.time) + f.debug_struct("InputCapability") + .field("time", self.time()) .field("internal", &"...") .finish() } diff --git a/timely/src/dataflow/operators/generic/handles.rs b/timely/src/dataflow/operators/generic/handles.rs index 548caf5dd..c73c928bc 100644 --- a/timely/src/dataflow/operators/generic/handles.rs +++ b/timely/src/dataflow/operators/generic/handles.rs @@ -17,7 +17,7 @@ use crate::communication::{Push, Pull, message::RefOrMut}; use crate::Container; use crate::logging::TimelyLogger as Logger; -use crate::dataflow::operators::CapabilityRef; +use crate::dataflow::operators::InputCapability; use crate::dataflow::operators::capability::CapabilityTrait; /// Handle to an operator's input stream. @@ -47,15 +47,15 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut)> { + pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { let internal = &self.internal; self.pull_counter.next_guarded().map(|(guard, bundle)| { match bundle.as_ref_or_mut() { RefOrMut::Ref(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Ref(&bundle.data)) + (InputCapability::new(internal.clone(), guard), RefOrMut::Ref(&bundle.data)) }, RefOrMut::Mut(bundle) => { - (CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Mut(&mut bundle.data)) + (InputCapability::new(internal.clone(), guard), RefOrMut::Mut(&mut bundle.data)) }, } }) @@ -80,7 +80,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>> InputHandleCore< /// }); /// ``` #[inline] - pub fn for_each, RefOrMut)>(&mut self, mut logic: F) { + pub fn for_each, RefOrMut)>(&mut self, mut logic: F) { let mut logging = self.logging.take(); while let Some((cap, data)) = self.next() { logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true })); @@ -105,7 +105,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp /// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability. /// Returns `None` when there's no more data available. #[inline] - pub fn next(&mut self) -> Option<(CapabilityRef, RefOrMut)> { + pub fn next(&mut self) -> Option<(InputCapability, RefOrMut)> { self.handle.next() } @@ -128,7 +128,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull>+'a> FrontieredInp /// }); /// ``` #[inline] - pub fn for_each, RefOrMut)>(&mut self, logic: F) { + pub fn for_each, RefOrMut)>(&mut self, logic: F) { self.handle.for_each(logic) } diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index 6f63f099b..508d10ac6 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -62,4 +62,4 @@ pub mod count; // keep "mint" module-private mod capability; -pub use self::capability::{ActivateCapability, Capability, CapabilityRef, CapabilitySet, DowngradeError}; +pub use self::capability::{ActivateCapability, Capability, InputCapability, CapabilitySet, DowngradeError};