Skip to content

Commit

Permalink
Move opinions about encoding from communication to timely. (#597)
Browse files Browse the repository at this point in the history
* Introduce Bytesable trait

* Use Bytesable for serialization

* Remove Message from Allocate::pipeline

* Remove communication dependence on Message

* Move opinions about encoding from communication to timely

* Move opinions about bincode into pacts

* Rebase and respond to fix-ups
  • Loading branch information
frankmcsherry authored Nov 11, 2024
1 parent 229c114 commit 54974b9
Show file tree
Hide file tree
Showing 21 changed files with 398 additions and 226 deletions.
1 change: 0 additions & 1 deletion communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ default = ["getopts"]

[dependencies]
getopts = { version = "0.2.21", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
timely_bytes = { path = "../bytes", version = "0.12" }
Expand Down
27 changes: 23 additions & 4 deletions communication/examples/comm_hello.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
use std::ops::Deref;
use timely_communication::{Message, Allocate};
use timely_communication::{Allocate, Bytesable};

/// A wrapper that indicates the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}

impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
}

fn length_in_bytes(&self) -> usize {
self.payload.len()
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
}
}

fn main() {

Expand All @@ -14,7 +33,7 @@ fn main() {

// send typed data along each channel
for i in 0 .. allocator.peers() {
senders[i].send(Message::from_typed(format!("hello, {}", i)));
senders[i].send(Message { payload: format!("hello, {}", i)});
senders[i].done();
}

Expand All @@ -26,7 +45,7 @@ fn main() {
allocator.receive();

if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.deref());
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}

Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use std::cell::RefCell;

use crate::allocator::thread::ThreadBuilder;
use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
use crate::allocator::{Allocate, AllocateBuilder, Thread, Process};
use crate::allocator::{Allocate, AllocateBuilder, Exchangeable, Thread, Process};
use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};

use crate::{Push, Pull, Data, Message};
use crate::{Push, Pull};

/// Enumerates known implementors of `Allocate`.
/// Passes trait method calls on to members.
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Generic {
}
}
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
match self {
Generic::Thread(t) => t.allocate(identifier),
Generic::Process(p) => p.allocate(identifier),
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Generic {
impl Allocate for Generic {
fn index(&self) -> usize { self.index() }
fn peers(&self) -> usize { self.peers() }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
self.allocate(identifier)
}

Expand Down
14 changes: 10 additions & 4 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub mod counters;

pub mod zero_copy;

use crate::{Data, Push, Pull, Message};
use crate::{Bytesable, Push, Pull};

/// A proto-allocator, which implements `Send` and can be completed with `build`.
///
Expand All @@ -32,6 +32,12 @@ pub trait AllocateBuilder : Send {
fn build(self) -> Self::Allocator;
}

use std::any::Any;

/// A type that can be sent along an allocated channel.
pub trait Exchangeable : Send+Any+Bytesable { }
impl<T: Send+Any+Bytesable> Exchangeable for T { }

/// A type capable of allocating channels.
///
/// There is some feature creep, in that this contains several convenience methods about the nature
Expand All @@ -42,7 +48,7 @@ pub trait Allocate {
/// The number of workers in the communication group.
fn peers(&self) -> usize;
/// Constructs several send endpoints and one receive endpoint.
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>);
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
/// A shared queue of communication events with channel identifier.
///
/// It is expected that users of the channel allocator will regularly
Expand Down Expand Up @@ -85,8 +91,8 @@ pub trait Allocate {
/// By default, this method uses the thread-local channel constructor
/// based on a shared `VecDeque` which updates the event queue.
fn pipeline<T: 'static>(&mut self, identifier: usize) ->
(thread::ThreadPusher<Message<T>>,
thread::ThreadPuller<Message<T>>)
(thread::ThreadPusher<T>,
thread::ThreadPuller<T>)
{
thread::Thread::new_from(identifier, self.events().clone())
}
Expand Down
12 changes: 6 additions & 6 deletions communication/src/allocator/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crossbeam_channel::{Sender, Receiver};

use crate::allocator::thread::{ThreadBuilder};
use crate::allocator::{Allocate, AllocateBuilder, Thread};
use crate::{Push, Pull, Message};
use crate::{Push, Pull};
use crate::buzzer::Buzzer;

/// An allocator for inter-thread, intra-process communication
Expand Down Expand Up @@ -110,7 +110,7 @@ impl Process {
impl Allocate for Process {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Any+Send+Sync+'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Any+Send>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {

// this is race-y global initialisation of all channels for all workers, performed by the
// first worker that enters this critical section
Expand All @@ -126,7 +126,7 @@ impl Allocate for Process {
let mut pushers = Vec::with_capacity(self.peers);
let mut pullers = Vec::with_capacity(self.peers);
for buzzer in self.buzzers.iter() {
let (s, r): (Sender<Message<T>>, Receiver<Message<T>>) = crossbeam_channel::unbounded();
let (s, r): (Sender<T>, Receiver<T>) = crossbeam_channel::unbounded();
// TODO: the buzzer in the pusher may be redundant, because we need to buzz post-counter.
pushers.push((Pusher { target: s }, buzzer.clone()));
pullers.push(Puller { source: r, current: None });
Expand All @@ -142,7 +142,7 @@ impl Allocate for Process {

let vector =
entry
.downcast_mut::<Vec<Option<(Vec<(Pusher<Message<T>>, Buzzer)>, Puller<Message<T>>)>>>()
.downcast_mut::<Vec<Option<(Vec<(Pusher<T>, Buzzer)>, Puller<T>)>>>()
.expect("failed to correctly cast channel");

let (sends, recv) =
Expand All @@ -166,10 +166,10 @@ impl Allocate for Process {
sends.into_iter()
.zip(self.counters_send.iter())
.map(|((s,b), sender)| CountPusher::new(s, identifier, sender.clone(), b))
.map(|s| Box::new(s) as Box<dyn Push<super::Message<T>>>)
.map(|s| Box::new(s) as Box<dyn Push<T>>)
.collect::<Vec<_>>();

let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<dyn Pull<super::Message<T>>>;
let recv = Box::new(CountPuller::new(recv, identifier, self.inner.events().clone())) as Box<dyn Pull<T>>;

(sends, recv)
}
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::VecDeque;
use crate::allocator::{Allocate, AllocateBuilder};
use crate::allocator::counters::Pusher as CountPusher;
use crate::allocator::counters::Puller as CountPuller;
use crate::{Push, Pull, Message};
use crate::{Push, Pull};

/// Builder for single-threaded allocator.
pub struct ThreadBuilder;
Expand All @@ -28,7 +28,7 @@ pub struct Thread {
impl Allocate for Thread {
fn index(&self) -> usize { 0 }
fn peers(&self) -> usize { 1 }
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: 'static>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
let (pusher, puller) = Thread::new_from(identifier, self.events.clone());
(vec![Box::new(pusher)], Box::new(puller))
}
Expand Down Expand Up @@ -62,9 +62,9 @@ impl Thread {

/// Creates a new thread-local channel from an identifier and shared counts.
pub fn new_from<T: 'static>(identifier: usize, events: Rc<RefCell<Vec<usize>>>)
-> (ThreadPusher<Message<T>>, ThreadPuller<Message<T>>)
-> (ThreadPusher<T>, ThreadPuller<T>)
{
let shared = Rc::new(RefCell::new((VecDeque::<Message<T>>::new(), VecDeque::<Message<T>>::new())));
let shared = Rc::new(RefCell::new((VecDeque::<T>::new(), VecDeque::<T>::new())));
let pusher = Pusher { target: shared.clone() };
let pusher = CountPusher::new(pusher, identifier, events.clone());
let puller = Puller { source: shared, current: None };
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use timely_bytes::arc::Bytes;

use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::AllocateBuilder;
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -135,7 +135,7 @@ pub struct TcpAllocator<A: Allocate> {
impl<A: Allocate> Allocate for TcpAllocator<A> {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {

// Assume and enforce in-order identifier allocation.
if let Some(bound) = self.channel_id_bound {
Expand All @@ -144,7 +144,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
self.channel_id_bound = Some(identifier);

// Result list of boxed pushers.
let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::new();
let mut pushes = Vec::<Box<dyn Push<T>>>::new();

// Inner exchange allocations.
let inner_peers = self.inner.peers();
Expand Down
8 changes: 4 additions & 4 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use timely_bytes::arc::Bytes;

use crate::networking::MessageHeader;

use crate::{Allocate, Message, Data, Push, Pull};
use crate::allocator::{AllocateBuilder};
use crate::{Allocate, Push, Pull};
use crate::allocator::{AllocateBuilder, Exchangeable};
use crate::allocator::canary::Canary;

use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
Expand Down Expand Up @@ -119,15 +119,15 @@ pub struct ProcessAllocator {
impl Allocate for ProcessAllocator {
fn index(&self) -> usize { self.index }
fn peers(&self) -> usize { self.peers }
fn allocate<T: Data>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<Message<T>>>>, Box<dyn Pull<Message<T>>>) {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {

// Assume and enforce in-order identifier allocation.
if let Some(bound) = self.channel_id_bound {
assert!(bound < identifier);
}
self.channel_id_bound = Some(identifier);

let mut pushes = Vec::<Box<dyn Push<Message<T>>>>::with_capacity(self.peers());
let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.peers());

for target_index in 0 .. self.peers() {

Expand Down
32 changes: 15 additions & 17 deletions communication/src/allocator/zero_copy/push_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use timely_bytes::arc::Bytes;

use crate::allocator::canary::Canary;
use crate::networking::MessageHeader;

use crate::{Data, Push, Pull};
use crate::allocator::Message;
use crate::{Bytesable, Push, Pull};

use super::bytes_exchange::{BytesPush, SendEndpoint};

Expand All @@ -35,9 +33,9 @@ impl<T, P: BytesPush> Pusher<T, P> {
}
}

impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
#[inline]
fn push(&mut self, element: &mut Option<Message<T>>) {
fn push(&mut self, element: &mut Option<T>) {
if let Some(ref mut element) = *element {

// determine byte lengths and build header.
Expand Down Expand Up @@ -68,11 +66,11 @@ impl<T:Data, P: BytesPush> Push<Message<T>> for Pusher<T, P> {
/// allocation.
pub struct Puller<T> {
_canary: Canary,
current: Option<Message<T>>,
current: Option<T>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}

impl<T:Data> Puller<T> {
impl<T: Bytesable> Puller<T> {
/// Creates a new `Puller` instance from a shared queue.
pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
Puller {
Expand All @@ -83,14 +81,14 @@ impl<T:Data> Puller<T> {
}
}

impl<T:Data> Pull<Message<T>> for Puller<T> {
impl<T: Bytesable> Pull<T> for Puller<T> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T>> {
fn pull(&mut self) -> &mut Option<T> {
self.current =
self.receiver
.borrow_mut()
.pop_front()
.map(Message::from_bytes);
.map(T::from_bytes);

&mut self.current
}
Expand All @@ -103,15 +101,15 @@ impl<T:Data> Pull<Message<T>> for Puller<T> {
/// like the `bytes` crate (../bytes/) which provides an exclusive view of a shared
/// allocation.
pub struct PullerInner<T> {
inner: Box<dyn Pull<Message<T>>>, // inner pullable (e.g. intra-process typed queue)
inner: Box<dyn Pull<T>>, // inner pullable (e.g. intra-process typed queue)
_canary: Canary,
current: Option<Message<T>>,
current: Option<T>,
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
}

impl<T:Data> PullerInner<T> {
impl<T: Bytesable> PullerInner<T> {
/// Creates a new `PullerInner` instance from a shared queue.
pub fn new(inner: Box<dyn Pull<Message<T>>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
PullerInner {
inner,
_canary,
Expand All @@ -121,9 +119,9 @@ impl<T:Data> PullerInner<T> {
}
}

impl<T:Data> Pull<Message<T>> for PullerInner<T> {
impl<T: Bytesable> Pull<T> for PullerInner<T> {
#[inline]
fn pull(&mut self) -> &mut Option<Message<T>> {
fn pull(&mut self) -> &mut Option<T> {

let inner = self.inner.pull();
if inner.is_some() {
Expand All @@ -134,7 +132,7 @@ impl<T:Data> Pull<Message<T>> for PullerInner<T> {
self.receiver
.borrow_mut()
.pop_front()
.map(Message::from_bytes);
.map(T::from_bytes);

&mut self.current
}
Expand Down
Loading

0 comments on commit 54974b9

Please sign in to comment.