Skip to content

Commit

Permalink
timely: unconstrained lifetime for CapabilityRef (#491)
Browse files Browse the repository at this point in the history
Since the merge of
#429,
`CapabilityRef`s have been made safe to hold onto across operator
invocations because that PR made sure that they only decremented their
progress counts on `Drop`. While this allowed `async`/`await` based
operators to freely hold on to them, it was still very difficult for
synchronous based operators to do the same thing, due to the lifetime
attached to the `CapabilityRef`.

We can observe that the lifetime no longer provides any benefits, which
means it can be removed and turn `CapabilityRef`s into fully owned
values. This allows any style of operator to easily hold on to them. The
benefit of that isn't just performance (by avoiding the `retain()`
dance), but also about deferring the decision of the output port a given
input should flow to to a later time.

After making this change, the name `CapabilityRef` felt wrong, since
there is no reference to anything anymore. Instead, the main distinction
between `CapabilityRef`s and `Capabilities` are that the former is
associated with an input port and the latter is associated with an
output port.

As such, I have renamed `CapabilityRef` to `InputCapability` to signal
to users that holding onto one of them represents holding onto a
timestamp at the input for which we have not yet determined the output
port that it should flow to. This nicely ties up the semantics of the
`InputCapability::retain_for_output` and
`InputCapability::delayed_for_output` methods, which make it clear by
their name and signature that this is what "transfers" the capability
from input ports to output ports.

Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
  • Loading branch information
petrosagg authored Jan 29, 2023
1 parent 51212fe commit 6a73600
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 36 deletions.
20 changes: 14 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@ pub struct Counter<T: Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> {
}

/// A guard type that updates the change batch counts on drop
pub struct ConsumedGuard<'a, T: Ord + Clone + 'static> {
consumed: &'a Rc<RefCell<ChangeBatch<T>>>,
pub struct ConsumedGuard<T: Ord + Clone + 'static> {
consumed: Rc<RefCell<ChangeBatch<T>>>,
time: Option<T>,
len: usize,
}

impl<'a, T:Ord+Clone+'static> Drop for ConsumedGuard<'a, T> {
impl<T:Ord+Clone+'static> ConsumedGuard<T> {
pub(crate) fn time(&self) -> &T {
&self.time.as_ref().unwrap()
}
}

impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
fn drop(&mut self) {
self.consumed.borrow_mut().update(self.time.take().unwrap(), self.len as i64);
// SAFETY: we're in a Drop impl, so this runs at most once
let time = self.time.take().unwrap();
self.consumed.borrow_mut().update(time, self.len as i64);
}
}

Expand All @@ -36,11 +44,11 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D,
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<'_, T>, &mut BundleCore<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut BundleCore<T, D>)> {
if let Some(message) = self.pullable.pull() {
if message.data.len() > 0 {
let guard = ConsumedGuard {
consumed: &self.consumed,
consumed: Rc::clone(&self.consumed),
time: Some(message.time.clone()),
len: message.data.len(),
};
Expand Down
44 changes: 22 additions & 22 deletions timely/src/dataflow/operators/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,39 +225,39 @@ impl Error for DowngradeError {}

type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;

/// An unowned capability, which can be used but not retained.
/// An capability of an input port. Holding onto this capability will implicitly holds onto a
/// capability for all the outputs ports this input is connected to, after the connection summaries
/// have been applied.
///
/// The capability reference supplies a `retain(self)` method which consumes the reference
/// and turns it into an owned capability
pub struct CapabilityRef<'cap, T: Timestamp+'cap> {
time: &'cap T,
/// This input capability supplies a `retain_for_output(self)` method which consumes the input
/// capability and turns it into a [Capability] for a specific output port.
pub struct InputCapability<T: Timestamp> {
internal: CapabilityUpdates<T>,
/// A drop guard that updates the consumed capability this CapabilityRef refers to on drop
_consumed_guard: ConsumedGuard<'cap, T>,
/// A drop guard that updates the consumed capability this InputCapability refers to on drop
consumed_guard: ConsumedGuard<T>,
}

impl<'cap, T: Timestamp+'cap> CapabilityTrait<T> for CapabilityRef<'cap, T> {
fn time(&self) -> &T { self.time }
impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
fn time(&self) -> &T { self.time() }
fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
// let borrow = ;
self.internal.borrow().iter().any(|rc| Rc::ptr_eq(rc, query_buffer))
}
}

impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
impl<T: Timestamp> InputCapability<T> {
/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
/// the provided [`ChangeBatch`].
pub(crate) fn new(time: &'cap T, internal: CapabilityUpdates<T>, guard: ConsumedGuard<'cap, T>) -> Self {
CapabilityRef {
time,
pub(crate) fn new(internal: CapabilityUpdates<T>, guard: ConsumedGuard<T>) -> Self {
InputCapability {
internal,
_consumed_guard: guard,
consumed_guard: guard,
}
}

/// The timestamp associated with this capability.
pub fn time(&self) -> &T {
self.time
self.consumed_guard.time()
}

/// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
Expand All @@ -271,7 +271,7 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
/// Delays capability for a specific output port.
pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
// TODO : Test operator summary?
if !self.time.less_equal(new_time) {
if !self.time().less_equal(new_time) {
panic!("Attempted to delay {:?} to {:?}, which is not beyond the capability's time.", self, new_time);
}
if output_port < self.internal.borrow().len() {
Expand All @@ -295,26 +295,26 @@ impl<'cap, T: Timestamp + 'cap> CapabilityRef<'cap, T> {
/// Transforms to an owned capability for a specific output port.
pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
if output_port < self.internal.borrow().len() {
Capability::new(self.time.clone(), self.internal.borrow()[output_port].clone())
Capability::new(self.time().clone(), self.internal.borrow()[output_port].clone())
}
else {
panic!("Attempted to acquire a capability for a non-existent output port.");
}
}
}

impl<'cap, T: Timestamp> Deref for CapabilityRef<'cap, T> {
impl<T: Timestamp> Deref for InputCapability<T> {
type Target = T;

fn deref(&self) -> &T {
self.time
self.time()
}
}

impl<'cap, T: Timestamp> Debug for CapabilityRef<'cap, T> {
impl<T: Timestamp> Debug for InputCapability<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("CapabilityRef")
.field("time", &self.time)
f.debug_struct("InputCapability")
.field("time", self.time())
.field("internal", &"...")
.finish()
}
Expand Down
14 changes: 7 additions & 7 deletions timely/src/dataflow/operators/generic/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::communication::{Push, Pull, message::RefOrMut};
use crate::Container;
use crate::logging::TimelyLogger as Logger;

use crate::dataflow::operators::CapabilityRef;
use crate::dataflow::operators::InputCapability;
use crate::dataflow::operators::capability::CapabilityTrait;

/// Handle to an operator's input stream.
Expand Down Expand Up @@ -47,15 +47,15 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
/// Returns `None` when there's no more data available.
#[inline]
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<D>)> {
pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
let internal = &self.internal;
self.pull_counter.next_guarded().map(|(guard, bundle)| {
match bundle.as_ref_or_mut() {
RefOrMut::Ref(bundle) => {
(CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Ref(&bundle.data))
(InputCapability::new(internal.clone(), guard), RefOrMut::Ref(&bundle.data))
},
RefOrMut::Mut(bundle) => {
(CapabilityRef::new(&bundle.time, internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
(InputCapability::new(internal.clone(), guard), RefOrMut::Mut(&mut bundle.data))
},
}
})
Expand All @@ -80,7 +80,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>> InputHandleCore<
/// });
/// ```
#[inline]
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, mut logic: F) {
let mut logging = self.logging.take();
while let Some((cap, data)) = self.next() {
logging.as_mut().map(|l| l.log(crate::logging::GuardedMessageEvent { is_start: true }));
Expand All @@ -105,7 +105,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>+'a> FrontieredInp
/// The timestamp `t` of the input buffer can be retrieved by invoking `.time()` on the capability.
/// Returns `None` when there's no more data available.
#[inline]
pub fn next(&mut self) -> Option<(CapabilityRef<T>, RefOrMut<D>)> {
pub fn next(&mut self) -> Option<(InputCapability<T>, RefOrMut<D>)> {
self.handle.next()
}

Expand All @@ -128,7 +128,7 @@ impl<'a, T: Timestamp, D: Container, P: Pull<BundleCore<T, D>>+'a> FrontieredInp
/// });
/// ```
#[inline]
pub fn for_each<F: FnMut(CapabilityRef<T>, RefOrMut<D>)>(&mut self, logic: F) {
pub fn for_each<F: FnMut(InputCapability<T>, RefOrMut<D>)>(&mut self, logic: F) {
self.handle.for_each(logic)
}

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ pub mod count;

// keep "mint" module-private
mod capability;
pub use self::capability::{ActivateCapability, Capability, CapabilityRef, CapabilitySet, DowngradeError};
pub use self::capability::{ActivateCapability, Capability, InputCapability, CapabilitySet, DowngradeError};

0 comments on commit 6a73600

Please sign in to comment.