Skip to content

Commit

Permalink
Remove abomonation to reduce unsoundness
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Aug 20, 2024
1 parent f659c51 commit 3d1b434
Show file tree
Hide file tree
Showing 18 changed files with 58 additions and 192 deletions.
4 changes: 1 addition & 3 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ default = ["getopts"]

[dependencies]
getopts = { version = "0.2.14", optional = true }
bincode = { version = "1.0", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
serde_derive = "1.0"
serde = "1.0"
abomonation = "0.7"
abomonation_derive = "0.5"
timely_bytes = { path = "../bytes", version = "0.12" }
timely_logging = { path = "../logging", version = "0.12" }
crossbeam-channel = "0.5.0"
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/push_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<T:Data> Pull<Message<T>> for Puller<T> {
self.receiver
.borrow_mut()
.pop_front()
.map(|bytes| unsafe { Message::from_bytes(bytes) });
.map(|bytes| Message::from_bytes(bytes));

&mut self.current
}
Expand Down Expand Up @@ -134,7 +134,7 @@ impl<T:Data> Pull<Message<T>> for PullerInner<T> {
self.receiver
.borrow_mut()
.pop_front()
.map(|bytes| unsafe { Message::from_bytes(bytes) });
.map(|bytes| Message::from_bytes(bytes));

&mut self.current
}
Expand Down
19 changes: 1 addition & 18 deletions communication/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
//!
//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait when using the
//! `bincode` feature or the [`Abomonation`](abomonation::Abomonation) trait when not.
//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait.
//!
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](Push) and [`Pull`](Pull)
//! traits), which is used for more precise control of resources.
Expand Down Expand Up @@ -77,14 +76,9 @@

#[cfg(feature = "getopts")]
extern crate getopts;
#[cfg(feature = "bincode")]
extern crate bincode;
#[cfg(feature = "bincode")]
extern crate serde;

extern crate abomonation;
#[macro_use] extern crate abomonation_derive;

extern crate timely_bytes as bytes;
extern crate timely_logging as logging_core;

Expand All @@ -97,26 +91,15 @@ pub mod buzzer;

use std::any::Any;

#[cfg(feature = "bincode")]
use serde::{Serialize, Deserialize};
#[cfg(not(feature = "bincode"))]
use abomonation::Abomonation;

pub use allocator::Generic as Allocator;
pub use allocator::Allocate;
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
pub use message::Message;

/// A composite trait for types that may be used with channels.
#[cfg(not(feature = "bincode"))]
pub trait Data : Send+Sync+Any+Abomonation+'static { }
#[cfg(not(feature = "bincode"))]
impl<T: Send+Sync+Any+Abomonation+'static> Data for T { }

/// A composite trait for types that may be used with channels.
#[cfg(feature = "bincode")]
pub trait Data : Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static { }
#[cfg(feature = "bincode")]
impl<T: Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static> Data for T { }

/// Pushing elements of type `T`.
Expand Down
8 changes: 4 additions & 4 deletions communication/src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Configuration and events for communication logging.
/// Configuration information about a communication thread.
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct CommunicationSetup {
/// True when this is a send thread (or the receive thread).
pub sender: bool,
Expand All @@ -12,7 +12,7 @@ pub struct CommunicationSetup {
}

/// Various communication events.
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum CommunicationEvent {
/// An observed message.
Message(MessageEvent),
Expand All @@ -21,7 +21,7 @@ pub enum CommunicationEvent {
}

/// An observed message.
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct MessageEvent {
/// true for send event, false for receive event
pub is_send: bool,
Expand All @@ -30,7 +30,7 @@ pub struct MessageEvent {
}

/// Starting or stopping communication threads.
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct StateEvent {
/// Is the thread a send (vs a recv) thread.
pub send: bool,
Expand Down
54 changes: 0 additions & 54 deletions communication/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::sync::Arc;
use bytes::arc::Bytes;
use abomonation;
use crate::Data;

/// Either an immutable or mutable reference.
Expand Down Expand Up @@ -68,8 +67,6 @@ pub struct Message<T> {

/// Possible returned representations from a channel.
enum MessageContents<T> {
/// Binary representation. Only available as a reference.
Binary(abomonation::abomonated::Abomonated<T, Bytes>),
/// Rust typed instance. Available for ownership.
Owned(T),
/// Atomic reference counted. Only available as a reference.
Expand All @@ -88,15 +85,13 @@ impl<T> Message<T> {
/// Destructures and returns any typed data.
pub fn if_typed(self) -> Option<T> {
match self.payload {
MessageContents::Binary(_) => None,
MessageContents::Owned(typed) => Some(typed),
MessageContents::Arc(_) => None,
}
}
/// Returns a mutable reference, if typed.
pub fn if_mut(&mut self) -> Option<&mut T> {
match &mut self.payload {
MessageContents::Binary(_) => None,
MessageContents::Owned(typed) => Some(typed),
MessageContents::Arc(_) => None,
}
Expand All @@ -108,54 +103,12 @@ impl<T> Message<T> {
/// data are serialized binary data.
pub fn as_ref_or_mut(&mut self) -> RefOrMut<T> {
match &mut self.payload {
MessageContents::Binary(bytes) => { RefOrMut::Ref(bytes) },
MessageContents::Owned(typed) => { RefOrMut::Mut(typed) },
MessageContents::Arc(typed) => { RefOrMut::Ref(typed) },
}
}
}

// These methods require `T` to implement `Abomonation`, for serialization functionality.
#[cfg(not(feature = "bincode"))]
impl<T: Data> Message<T> {
/// Wrap bytes as a message.
///
/// # Safety
///
/// This method is unsafe, in that `Abomonated::new()` is unsafe: it presumes that
/// the binary data can be safely decoded, which is unsafe for e.g. UTF8 data and
/// enumerations (perhaps among many other types).
pub unsafe fn from_bytes(bytes: Bytes) -> Self {
let abomonated = abomonation::abomonated::Abomonated::new(bytes).expect("Abomonated::new() failed.");
Message { payload: MessageContents::Binary(abomonated) }
}

/// The number of bytes required to serialize the data.
pub fn length_in_bytes(&self) -> usize {
match &self.payload {
MessageContents::Binary(bytes) => { bytes.as_bytes().len() },
MessageContents::Owned(typed) => { abomonation::measure(typed) },
MessageContents::Arc(typed) =>{ abomonation::measure::<T>(&**typed) } ,
}
}

/// Writes the binary representation into `writer`.
pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match &self.payload {
MessageContents::Binary(bytes) => {
writer.write_all(bytes.as_bytes()).expect("Message::into_bytes(): write_all failed.");
},
MessageContents::Owned(typed) => {
unsafe { abomonation::encode(typed, writer).expect("Message::into_bytes(): Abomonation::encode failed"); }
},
MessageContents::Arc(typed) => {
unsafe { abomonation::encode(&**typed, writer).expect("Message::into_bytes(): Abomonation::encode failed"); }
},
}
}
}

#[cfg(feature = "bincode")]
impl<T: Data> Message<T> {
/// Wrap bytes as a message.
pub fn from_bytes(bytes: Bytes) -> Self {
Expand All @@ -166,7 +119,6 @@ impl<T: Data> Message<T> {
/// The number of bytes required to serialize the data.
pub fn length_in_bytes(&self) -> usize {
match &self.payload {
MessageContents::Binary(bytes) => { bytes.as_bytes().len() },
MessageContents::Owned(typed) => {
::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize
},
Expand All @@ -179,9 +131,6 @@ impl<T: Data> Message<T> {
/// Writes the binary representation into `writer`.
pub fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match &self.payload {
MessageContents::Binary(bytes) => {
writer.write_all(bytes.as_bytes()).expect("Message::into_bytes(): write_all failed.");
},
MessageContents::Owned(typed) => {
::bincode::serialize_into(writer, &typed).expect("bincode::serialize_into() failed");
},
Expand All @@ -197,7 +146,6 @@ impl<T> ::std::ops::Deref for Message<T> {
fn deref(&self) -> &Self::Target {
// TODO: In principle we have aready decoded, but let's go again
match &self.payload {
MessageContents::Binary(bytes) => { bytes },
MessageContents::Owned(typed) => { typed },
MessageContents::Arc(typed) => { typed },
}
Expand All @@ -208,7 +156,6 @@ impl<T: Clone> Message<T> {
/// Produces a typed instance of the wrapped element.
pub fn into_typed(self) -> T {
match self.payload {
MessageContents::Binary(bytes) => bytes.clone(),
MessageContents::Owned(instance) => instance,
// TODO: Could attempt `Arc::try_unwrap()` here.
MessageContents::Arc(instance) => (*instance).clone(),
Expand All @@ -218,7 +165,6 @@ impl<T: Clone> Message<T> {
pub fn as_mut(&mut self) -> &mut T {

let cloned: Option<T> = match &self.payload {
MessageContents::Binary(bytes) => Some((*bytes).clone()),
MessageContents::Owned(_) => None,
// TODO: Could attempt `Arc::try_unwrap()` here.
MessageContents::Arc(typed) => Some((**typed).clone()),
Expand Down
2 changes: 1 addition & 1 deletion communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ByteOrder = byteorder::BigEndian;
/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
/// destination workers, and the length in bytes.
// *Warning*: Adding, removing and altering fields requires to adjust the implementation below!
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct MessageHeader {
/// index of channel.
pub channel: usize,
Expand Down
38 changes: 13 additions & 25 deletions mdbook/src/chapter_4/chapter_4_5.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,21 @@ struct YourStruct { .. }

## The `ExchangeData` trait

The `ExchangeData` trait is more complicated, and is established in the `communication/` module. There are two options for this trait, which are determined by whether you use the `--bincode` feature at compilation, or not.
The `ExchangeData` trait is more complicated, and is established in the `communication/` module. The trait is a synonym for

* If you use `--bincode` then the trait is a synonym for

```rust,ignore
Send+Sync+Any+serde::Serialize+for<'a>serde::Deserialize<'a>+'static
```
where `serde` is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:
```rust,ignore
#[derive(Serialize, Deserialize)]
```
You must include the `serde` crate, and if not on Rust 2018 the `serde_derive` crate.
The downside to the `--bincode` flag is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.
```rust,ignore
Send+Sync+Any+serde::Serialize+for<'a>serde::Deserialize<'a>+'static
```

* If you do not use the `--bincode` feature, then the `Serialize` and `Deserialize` requirements are replaced by `Abomonation`, from the `abomonation` crate. This trait allows in-place deserialization, but is implemented for fewer types, and has the potential to be a bit scarier (due to in-place pointer correction).
where `serde` is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:

Your types likely do not implement `Abomonation` by default, but you can similarly use
```rust,ignore
#[derive(Serialize, Deserialize)]
```

```rust,ignore
#[derive(Abomonation)]
```
You must include the `serde` crate, and if not on Rust 2018 the `serde_derive` crate.

You must include the `abomonation` and `abomonation_derive` crate for this to work correctly.
The downside to is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.

## An example

Expand Down Expand Up @@ -140,7 +128,7 @@ impl<D> TreeNode<D> {

We get a new error. A not especially helpful error. It says that it cannot find an `exchange` method, or more specifically that one exists but it doesn't apply to our type at hand. This is because the data need to satisfy the `ExchangeData` trait but do not. It would be better if this were clearer in the error messages, I agree.

We can fix the problem two ways. First, if you would like to use `bincode`, then we update the source like so:
The fix is to update the source like so:

```rust,ignore
#[macro_use]
Expand All @@ -154,10 +142,10 @@ struct TreeNode<D> {
}
```

and make sure to include the `serde_derive` and `serde` crates. Now when we run things (notice the `--features` flag) we see:
and make sure to include the `serde_derive` and `serde` crates.

```ignore
Echidnatron% cargo run --example types --features bincode
Echidnatron% cargo run --example types
Finished dev [unoptimized + debuginfo] target(s) in 0.07s
Running `target/debug/examples/types`
seen: TreeNode { data: 0, children: [] }
Expand Down
4 changes: 1 addition & 3 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ license = "MIT"

[features]
default = ["getopts"]
bincode= ["timely_communication/bincode"]
getopts = ["getopts-dep", "timely_communication/getopts"]

[dependencies]
getopts-dep = { package = "getopts", version = "0.2.14", optional = true }
bincode = { version = "1.0" }
serde = "1.0"
serde_derive = "1.0"
abomonation = "0.7.3"
abomonation_derive = "0.5"
timely_bytes = { path = "../bytes", version = "0.12" }
timely_logging = { path = "../logging", version = "0.12" }
timely_communication = { path = "../communication", version = "0.12", default-features = false }
Expand Down
7 changes: 0 additions & 7 deletions timely/examples/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Wordcount based on flatcontainer.
#[cfg(feature = "bincode")]
use {
std::collections::HashMap,
timely::container::CapacityContainerBuilder,
Expand All @@ -11,7 +10,6 @@ use {
timely::dataflow::ProbeHandle,
};

#[cfg(feature = "bincode")]
fn main() {

type Container = FlatStack<<(String, i64) as RegionPreference>::Region>;
Expand Down Expand Up @@ -97,8 +95,3 @@ fn main() {
})
.unwrap();
}

#[cfg(not(feature = "bincode"))]
fn main() {
eprintln!("Example requires feature bincode.");
}
7 changes: 0 additions & 7 deletions timely/examples/rc.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
extern crate abomonation;
extern crate timely;

use std::rc::Rc;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Inspect, Probe};
use abomonation::Abomonation;

#[derive(Debug, Clone)]
pub struct Test {
_field: Rc<usize>,
}

impl Abomonation for Test {
unsafe fn entomb<W: ::std::io::Write>(&self, _write: &mut W) -> ::std::io::Result<()> { panic!() }
unsafe fn exhume<'a,'b>(&'a mut self, _bytes: &'b mut [u8]) -> Option<&'b mut [u8]> { panic!() }
}

fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod pact;
pub type Bundle<T, C> = crate::communication::Message<Message<T, C>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
pub struct Message<T, C> {
/// The timestamp associated with the message.
pub time: T,
Expand Down
Loading

0 comments on commit 3d1b434

Please sign in to comment.