Skip to content

Commit

Permalink
Remove Bundle type and have Message implement Bytesable directly
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 24, 2024
1 parent 6ce139a commit 7bfbefe
Show file tree
Hide file tree
Showing 12 changed files with 112 additions and 95 deletions.
29 changes: 23 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ pub mod pullers;
/// Parallelization contracts, describing how data must be exchanged between operators.
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type Bundle<T, C> = crate::Bincode<Message<T, C>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Serialize, Deserialize)]
pub struct Message<T, C> {
Expand Down Expand Up @@ -44,17 +41,37 @@ impl<T, C: Container> Message<T, C> {
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element. The buffer is cleared.
#[inline]
pub fn push_at<P: Push<Bundle<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(Bundle::from(message));
let mut bundle = Some(Message::from(message));

pusher.push(&mut bundle);

if let Some(message) = bundle {
*buffer = message.payload.data;
*buffer = message.data;
buffer.clear();
}
}
}

// Instructions for serialization of `Message`.
// Intended to swap out the constraint on `C` for `C: Bytesable`.
impl<T, C> crate::communication::Bytesable for Message<T, C>
where
T: Serialize + for<'a> Deserialize<'a>,
C: Serialize + for<'a> Deserialize<'a>,
{
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
}

fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
}
}
38 changes: 19 additions & 19 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull};
use crate::container::PushPartitioned;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::dataflow::channels::Bundle;
use crate::dataflow::channels::Message;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;
Expand All @@ -24,9 +24,9 @@ use crate::ExchangeData;
/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, C> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<Bundle<T, C>>+'static;
type Pusher: Push<Message<T, C>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<Bundle<T, C>>+'static;
type Puller: Pull<Message<T, C>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
Expand All @@ -36,10 +36,10 @@ pub trait ParallelizationContract<T, C> {
pub struct Pipeline;

impl<T: 'static, C: Container> ParallelizationContract<T, C> for Pipeline {
type Pusher = LogPusher<T, C, ThreadPusher<Bundle<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Bundle<T, C>>>;
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Bundle<T, C>>(identifier, address);
let (pusher, puller) = allocator.pipeline::<Message<T, C>>(identifier, address);
(LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()),
LogPuller::new(puller, allocator.index(), identifier, logging))
}
Expand Down Expand Up @@ -71,11 +71,11 @@ where
C: ExchangeData + PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Bundle<T, C>>(identifier, address);
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
}
Expand All @@ -89,7 +89,7 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
pub struct LogPusher<T, C, P: Push<Message<T, C>>> {
pusher: P,
channel: usize,
counter: usize,
Expand All @@ -99,7 +99,7 @@ pub struct LogPusher<T, C, P: Push<Bundle<T, C>>> {
logging: Option<Logger>,
}

impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -114,16 +114,16 @@ impl<T, C, P: Push<Bundle<T, C>>> LogPusher<T, C, P> {
}
}

impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T, C, P> {
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
#[inline]
fn push(&mut self, pair: &mut Option<Bundle<T, C>>) {
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
if let Some(bundle) = pair {
self.counter += 1;

// Stamp the sequence number and source.
// FIXME: Awkward moment/logic.
bundle.payload.seq = self.counter - 1;
bundle.payload.from = self.source;
bundle.seq = self.counter - 1;
bundle.from = self.source;

if let Some(logger) = self.logging.as_ref() {
logger.log(MessagesEvent {
Expand All @@ -143,15 +143,15 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Push<Bundle<T, C>> for LogPusher<T,

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, C, P: Pull<Bundle<T, C>>> {
pub struct LogPuller<T, C, P: Pull<Message<T, C>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, C)>,
logging: Option<Logger>,
}

impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -164,9 +164,9 @@ impl<T, C, P: Pull<Bundle<T, C>>> LogPuller<T, C, P> {
}
}

impl<T, C: Container, P: Pull<Bundle<T, C>>> Pull<Bundle<T, C>> for LogPuller<T, C, P> {
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
#[inline]
fn pull(&mut self) -> &mut Option<Bundle<T, C>> {
fn pull(&mut self) -> &mut Option<Message<T, C>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::dataflow::channels::Bundle;
use crate::dataflow::channels::Message;
use crate::progress::ChangeBatch;
use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> {
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<C>,
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut Bundle<T, C>> {
pub fn next(&mut self) -> Option<&mut Message<T, C>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, C>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Message<T, C>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, C: Container, P: Pull<Bundle<T, C>>> Counter<T, C, P>
}
}

impl<T:Ord+Clone+'static, C, P: Pull<Bundle<T, C>>> Counter<T, C, P> {
impl<T:Ord+Clone+'static, C, P: Pull<Message<T, C>>> Counter<T, C, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
24 changes: 12 additions & 12 deletions timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::channels::Message;
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::Container;
Expand Down Expand Up @@ -44,7 +44,7 @@ impl<T, CB: Default, P> Buffer<T, CB, P> {
}
}

impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
impl<T, C: Container, P: Push<Message<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
/// Returns a `Session`, which accepts data to send at the associated time
#[inline]
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
Expand All @@ -66,7 +66,7 @@ impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
/// Returns a `Session`, which accepts data to send at the associated time
pub fn session_with_builder(&mut self, time: &T) -> Session<T, CB, P> {
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
Expand All @@ -85,7 +85,7 @@ impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P
}
}

impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
/// Flushes all data and pushes a `None` to `self.pusher`, indicating a flush.
pub fn cease(&mut self) {
self.flush();
Expand Down Expand Up @@ -115,7 +115,7 @@ impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
where
T: Eq+Clone,
CB: ContainerBuilder + PushInto<D>,
P: Push<Bundle<T, CB::Container>>
P: Push<Message<T, CB::Container>>
{
#[inline]
fn push_into(&mut self, item: D) {
Expand All @@ -136,7 +136,7 @@ pub struct Session<'a, T, CB, P> {
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
where
T: Eq + Clone + 'a,
P: Push<Bundle<T, C>> + 'a,
P: Push<Message<T, C>> + 'a,
{
/// Provide a container at the time specified by the [Session].
pub fn give_container(&mut self, container: &mut C) {
Expand All @@ -148,7 +148,7 @@ impl<'a, T, CB, P> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
P: Push<Message<T, CB::Container>> + 'a
{
/// Access the builder. Immutable access to prevent races with flushing
/// the underlying buffer.
Expand Down Expand Up @@ -179,7 +179,7 @@ impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
Expand All @@ -192,7 +192,7 @@ pub struct AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
/// A reference to the underlying buffer.
buffer: &'a mut Buffer<T, CB, P>,
Expand All @@ -204,7 +204,7 @@ impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
/// Transmits a single record.
#[inline]
Expand All @@ -231,7 +231,7 @@ impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
Expand All @@ -243,7 +243,7 @@ impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
P: Push<Message<T, CB::Container>> + 'a,
{
fn drop(&mut self) {
self.buffer.cease();
Expand Down
10 changes: 5 additions & 5 deletions timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::{ChangeBatch, Timestamp};
use crate::dataflow::channels::Bundle;
use crate::dataflow::channels::Message;
use crate::communication::Push;
use crate::Container;

/// A wrapper which updates shared `produced` based on the number of records pushed.
#[derive(Debug)]
pub struct Counter<T, C, P: Push<Bundle<T, C>>> {
pub struct Counter<T, C, P: Push<Message<T, C>>> {
pushee: P,
produced: Rc<RefCell<ChangeBatch<T>>>,
phantom: PhantomData<C>,
}

impl<T: Timestamp, C: Container, P> Push<Bundle<T, C>> for Counter<T, C, P> where P: Push<Bundle<T, C>> {
impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
#[inline]
fn push(&mut self, message: &mut Option<Bundle<T, C>>) {
fn push(&mut self, message: &mut Option<Message<T, C>>) {
if let Some(message) = message {
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
}
Expand All @@ -31,7 +31,7 @@ impl<T: Timestamp, C: Container, P> Push<Bundle<T, C>> for Counter<T, C, P> wher
}
}

impl<T, C, P: Push<Bundle<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
impl<T, C, P: Push<Message<T, C>>> Counter<T, C, P> where T : Ord+Clone+'static {
/// Allocates a new `Counter` from a pushee and shared counts.
pub fn new(pushee: P) -> Counter<T, C, P> {
Counter {
Expand Down
Loading

0 comments on commit 7bfbefe

Please sign in to comment.