diff --git a/timely/src/dataflow/channels/mod.rs b/timely/src/dataflow/channels/mod.rs index 553e8bda6..7c39ed667 100644 --- a/timely/src/dataflow/channels/mod.rs +++ b/timely/src/dataflow/channels/mod.rs @@ -61,17 +61,78 @@ impl Message { impl crate::communication::Bytesable for Message where T: Serialize + for<'a> Deserialize<'a>, - C: Serialize + for<'a> Deserialize<'a>, + C: ContainerBytes, { - fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { - ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed") + fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self { + let mut slice = &bytes[..]; + let from: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed"); + let seq: usize = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed"); + let time: T = ::bincode::deserialize(&mut slice).expect("bincode::deserialize() failed"); + let bytes_read = bytes.len() - slice.len(); + bytes.extract_to(bytes_read); + let data: C = ContainerBytes::from_bytes(bytes); + Self { time, data, from, seq } } fn length_in_bytes(&self) -> usize { - ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize + ::bincode::serialized_size(&self.from).expect("bincode::serialized_size() failed") as usize + + ::bincode::serialized_size(&self.seq).expect("bincode::serialized_size() failed") as usize + + ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize + + self.data.length_in_bytes() } fn into_bytes(&self, writer: &mut W) { - ::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed"); + ::bincode::serialize_into(&mut *writer, &self.from).expect("bincode::serialize_into() failed"); + ::bincode::serialize_into(&mut *writer, &self.seq).expect("bincode::serialize_into() failed"); + ::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed"); + self.data.into_bytes(&mut *writer); } -} \ No newline at end of file +} + + +/// A container-oriented version of `Bytesable` that can be implemented here for `Vec` and other containers. +pub trait ContainerBytes { + /// Wrap bytes as `Self`. + fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self; + + /// The number of bytes required to serialize the data. + fn length_in_bytes(&self) -> usize; + + /// Writes the binary representation into `writer`. + fn into_bytes(&self, writer: &mut W); +} + +mod implementations { + + use serde::{Serialize, Deserialize}; + use crate::dataflow::channels::ContainerBytes; + + impl Deserialize<'a>> ContainerBytes for Vec { + fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { + ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed") + } + + fn length_in_bytes(&self) -> usize { + ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize + } + + fn into_bytes(&self, writer: &mut W) { + ::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed"); + } + } + + use crate::container::flatcontainer::FlatStack; + impl Deserialize<'a> + crate::container::flatcontainer::Region> ContainerBytes for FlatStack { + fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self { + ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed") + } + + fn length_in_bytes(&self) -> usize { + ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize + } + + fn into_bytes(&self, writer: &mut W) { + ::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed"); + } + } +} diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 8ef59598c..ae2261f66 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -68,7 +68,7 @@ where // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. impl ParallelizationContract for ExchangeCore where - C: ExchangeData + PushPartitioned, + C: ExchangeData + PushPartitioned + crate::dataflow::channels::ContainerBytes, for<'a> H: FnMut(&C::Item<'a>) -> u64 { type Pusher = ExchangePusher>>>, H>; diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index 19a34338e..cf02256aa 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -30,7 +30,7 @@ pub trait Exchange { impl Exchange for StreamCore where - C: PushPartitioned + ExchangeData, + C: PushPartitioned + ExchangeData + crate::dataflow::channels::ContainerBytes, { fn exchange(&self, route: F) -> StreamCore where