Skip to content

Latest commit

 

History

History
284 lines (221 loc) · 17.2 KB

2024-10-11.md

File metadata and controls

284 lines (221 loc) · 17.2 KB

Dataflow and Columns and WASM, Oh My!

A bit of a show and tell this post, bringing together a few pieces of lately synergistic technology.

  1. Timely dataflow has recently-ish acquired the ability to ship "containers" of data, allowing you to bundle your records in a representation you like rather than needing to use Vec<T>. Props to Moritz Hoffmann for this work.
  2. The columnar crate can act as one such container, laying out your complex struct and enum types in relatively few allocations.
  3. Web Assembly (WASM) provides a handy escape hatch for describing transforms on data, if only you could get the data in to and out of a runtime efficiently.

The tl;dr is that you can write timely dataflow operators that host a wasmtime instance, and copy data in and out with just a few memcpys if you have used columnar for your containers.

This doesn't mean alarmingly fast dataflows that you couldn't have before, or some performance unlock for WASM, or anything all that deep just yet. It's potentially useful for us at Materialize where it's helpful to have escape hatches when the function you need isn't yet implemented. It could also be interesting as a way to take other languages that can target WASM and bring them online in a distributed dataflow.

For the moment, it's just show and tell, though.

Columnar data layouts

The columnar crate is the first thing I blogged about, nearly ten years ago. It is some Rust traits and types that allow you to transform sequences of complex types to (multiple) sequences of simple types. Ultimately, your structs, enums, Vecs, and even recursive types can end up as a fixed number of allocations of primitive types (things like u8 and i32).

The trick is that you don't get your data type back the same way you put it in. While you may have inserted a (String, i64), and you may expect to see a &(String, i64) when you look, you'll instead get back a (&str, &i64). This isn't what you inserted, which is a light bummer, but it means that we can store your data as a [u8] and [i64], as well as a list of delimiters for the strings (u64 for now). You trade away specific control over the layout of the type for a representation that is much simpler for the computer.

Representing a collectiono of data as a few slices of primitive types means it's pretty easy to go back and forth between binary and typed data. Each slice of primitive type converts to a slice [u8] of bytes, and if correctly aligned a slice of bytes goes back to the primitive types just as easily. It's essentially zero cost to move between the representations.

In particular, the columnar crate comes with some serialization code that encodes and decodes typed data as length-delimited binary data. This moves between a single binary slice and the typed containers, which amounts to a memcpy in one direction to append the various slices into one, and .. just parsing out the lengths in the other direction. The cheap serialization can remove a dependence on more general serialization (e.g. Rust's serde), and make moving across various system boundaries much cheaper.

Timely dataflow containers

For time immemorial, timely dataflow has used Vec<T> to shuttle about your streams of records of type T. It seemed like a great way to do it at the time, and it has held up relatively well.

At the same time, ownership-based allocations increasingly feel like an antipattern for scalable data processing. I've written before about how serialized data can be faster than owned data, borrowing from the Broom HotOS paper. If you think about the difference between shipping Vec<String> and (Vec<u8>, Vec<usize>), the main thing the former provides is ownership over the memory backing the String. However, that ownership is also a responsibility: to de-allocate it once you are done using it. This is a responsibility that costs, especially in a multi-threaded environment, as reconstituting all these buffers ends up being a bit of an allocator microbenchmark.

Moritz Hoffmann recently led some work to allow one to swap out Vec<T> with a more general Container implementation, which only needs to be able to present data to you. Everywhere we had opinions about a Vec<T>, it turns out we only really needed to know about the length, and to have the ability to iterate and drain. A not insubstantial refactoring later, most of timely dataflow allows you to plug in your own container implementations.

This is handy not only because you can streamline your memory ownership responsibilities, but also because the data may just be more conviently arranged. Columnar layouts, for example, can admit much more efficient logic when you need to grind over them at high throughputs. And, as we'll see, these layouts are prepped for success should we need to shell out to other frameworks.

Web assembly (WASM)

Web assembly is a bigger thing than I'm going to be able to explain here. For our purposes, it has the two properties that

  1. It can be targeted by Rust code: you write code in Rust but get out web assembly.
  2. It can be hosted by Rust code: you write Rust and can call out to web assembly.

This opens up an approach to iterative development that avoids one of Rust's main pain points: the recompilation loop. Materialize, for example, takes about 20-30 minutes for a release build, and it's not all that much faster even if the change was localized to one function. Stepping even further back, the feedback cycle for introducing a new function into a system like Materialize is often weeks, due to design, testing, and the seriousness of backward compatibility guarantees.

Web assembly provides an interesting pressure release valve for when you want to try out something new, but want to iterate faster than building a new version of everything.

The caveats around WASM are unsurprising. Moving between Rust's runtime and the WASM's sandboxed runtime involves some amount of work, and some amount of copying of data. There are other caveats for sure, but we'll ignore the ones we don't plan on addressing in this post, largely out of naivete.

Wiring the parts together

Imagine we have some data we are dataflowing around, each record an instance of this type:

struct WordCount {
    text: String,
    diff: i64,
}

This tracks the timely wordcount example, which uses pairs rather than a struct, but .. otherwise moves this sort of information around. If you dive in to the example, we've written two operators:

  1. A "flatmap" operator that segments a line of text into words (with the same diffs).
  2. A "reduce" operator that maintains a running count for each word (and emits any changed totals).

Both of these are written in Rust, and that's totally fine. The compile time here isn't really a problem, but let's pretend. Imagine you wanted to do you own thing to the words that come out, perhaps lowercasing them, or some other transformation, and you don't want to have to commit to it when the Rust program is written. This is where WASM comes in: we can write a timely operator that loads up WASM written after the fact, and applies it to the records it sees.

Writing operator logic in WASM

Let's start by writing the Rust code that we'll turn into WASM. This is a bit shorter and more self-contained than the timely dataflow stuff.

Let's start by looking at the whole program.

use columnar::{Columnar, Index, Len};
use columnar::bytes::{AsBytes, FromBytes, serialization::decode};

#[no_mangle]
pub fn act(ptr: *const u8, len: usize) -> *const u8 {

    // Acquire the binary slice indicated by the arguments.
    let slice: &[u8] = unsafe { std::slice::from_raw_parts(ptr, len) };
    let words: &[u64] = bytemuck::cast_slice::<_, u64>(slice);

    // Interpret the binary data as a sequence of (String, i64) pairs.
    let mut decoder = decode(words);
    let decoded = <<(String, i64) as Columnar>::Container as AsBytes>::Borrowed::from_bytes(&mut decoder);

    // Compute the sum of the lengths of the strings and the numbers.
    let mut tallies: Vec<u64> = Vec::with_capacity(decoded.len());
    for (name, num) in decoded.into_iter() {
        tallies.push(name.len() as u64 + *num as u64);
    }

    // Capture a reference to the backing binary data, and return it.
    let bytes: &[u8] = tallies.as_bytes().next().unwrap().1;
    bytes.as_ptr()
}

To start, the methods we define want to be in terms of simple argument types. Getting complicated types across the Rust-WASM bridge is non-trivial, and it's potentially much better just to hand a pointer to some serialized data (and a length). Our function will also return just a pointer; I would have it return a length too, but it seems that multi-valued returns are not seamlessly handled yet. It turns out that the caller won't need to know the length (the serialized format is self-delimiting).

The first thing the method does is grab the referenced binary data, unsafely forming a byte slice. I'm actively trying to remove unsafe from my life, so if you know a more safe way to do this let me know. The byte slice is then cast to a [u64] slice, which is how we'll manage alignment. If the bytes are not correctly aligned this will fail, so we'll want to ship aligned bytes (we will).

The next thing the method does is convert the [u64] slice into typed data. This is where columnar comes in, and all it really does is block out subslices for different allocations (e.g. characters, integers, and offsets). Although this looks at lengths in words, it doesn't write to anything other than the fields of the struct it forms.

The method continues to do some "business logic". In this case, for each (String, i64) it adds the string length to the integer. I don't know I couldn't really think of anything in the moment. It puts the results into a columnar container, which for primitive types like u64 is just a Vec.

Finally, we use columnar's ability to get out binary slices, which in this case is just a transmutation from &[u64] to &[u8], handled behind the scenes by the helpful bytemuck crate. We return the resulting pointer, so that the caller can read out the results.

I hope you agree that while potentially fiddly (I can confirm), the whole thing is not overwhelming. Only the third block of code is not boilerplate, and everything else can largely be copied and pasted. If you type

cargo build --target wasm32-unknown-unknown --release

you'll end up with a .wasm file in your target/ directory that you can point interested folks at.

Writing an operator for WASM

Now that we have a .wasm file that describes what we would like to do with each record, we need to make that happen.

If you are familiar with timely dataflow, you'll know that dataflows get written in a "fluent" style that is a sequence of method calls. In the wordcount example above, we see things like

input
    .to_stream(scope)
    .container::<Vec<_>>()
    .flat_map(|(text, diff): (String, i64)|
        text.split_whitespace()
            .map(move |word| (word.to_owned(), diff))
            .collect::<Vec<_>>()
    )
    .container::<Vec<_>>()
    ...

This turns an input handle into a stream in the scope, announces the container type, applies a flat_map that segments the lines of text into words, and .. announces a container type again. The container types can vary, and that means we have to name them when they do. And when they don't.

We are going to write an operator not unlike the flat_map, except it will be a bit wordier. Most of these operators, including flat_map, are thin wrappers around more general operator builders. There is for example a unary operator builder that takes an input stream, some operator logic, and produces an output stream. We'll build one of these, and then talk through its implementation.

// This picks up from where the line example above left off.
.container::<Container>()
.unary(Pipeline, "WASM", |_,_| {

    use wasmtime::*;

    // Prepare the moving parts we'll need to interact with our WASM logic.
    let engine = Engine::default();
    let module = Module::from_file(&engine, "/Users/mcsherry/Projects/my_workspace/target/wasm32-unknown-unknown/release/wasm_module.wasm").unwrap();
    let mut store = Store::new(&engine, ());
    let instance = Instance::new(&mut store, &module, &[]).unwrap();
    let act = instance.get_typed_func::<(i32, i32), i32>(&mut store, "act").unwrap();

    // An unrelated temporary used by timely.
    let mut container = Default::default();

    move |input, output| {
        input.for_each(|time, data| {

            unimplemented!()

        })
    }
)
.container::<Container>()

The first thing you might notice is that we have a Container type now. We aren't using Vec<_> any more, and instead have a type that uses a columnar layout. I wasn't entirely sure where to call this out, so let's do it here:

// Creates `WordCountContainer` and `WordCountReference` structs,
// as well as various implementations relating them to `WordCount`.
#[derive(Columnar)]
struct WordCount {
    text: String,
    diff: i64,
}
// A short-hand for the columnar timely container for `WordCount`.
type Container = Columnar<<WordCount as columnar::Columnar>::Container>;

Ok, that out of the way, the code above creates a unary operator with some arguments you can ignore (Pipeline indicates that no data exchange is required, "WASM" is a name for the operator, and the closure arguments are forcibly ignored). The main other thing the code does so far is prepare several wasmtime types, including one that goes and fetches our .wasm file and gets it ready to go. All of this is only set-up for the moment. We aren't looking at actual data yet. The body returns a closure that acts on input and output, and which will be called for each container's-worth of data that arrives.

The closure is unimplemented above, so let's do that now.

move |input, output| {
    input.for_each(|time, data| {

        // Acquire the presented data.
        data.swap(&mut container);

        // Serialize the data, as it is not yet *one* contiguous byte slice.
        use columnar::bytes::AsBytes;
        let mut serialized = Vec::new();
        columnar::bytes::serialization::encode(&mut serialized, container.store.as_bytes());
        let bytes: &[u8] = bytemuck::cast_slice(&serialized[..]);

        // Write the data into the WASM module's memory, at 8 not 0 because ..
        let memory = instance.get_memory(&mut store, "memory").unwrap();                
        memory.write(&mut store, 8, bytes).unwrap();

        // Call the function and get a pointer to the resulting binary data.
        let result = act.call(&mut store, (8, bytes.len() as i32)).unwrap() as usize;

        // Track down the result and interepret it.
        let memory = instance.get_memory(&mut store, "memory").unwrap();
        let slice = memory.data(&store);
        let nums: &[i64] = bytemuck::cast_slice(&slice[result ..][.. (8 * container.len())]);

        // Update the diffs to what was returned by the logic.
        container.store.diff.clone_from_slice(nums);
        output.session(&time).give_container(&mut container);
    });
}

The first line tracks down the presented data and moves it out of an ambiguous representation (owned, serialized, none of the above) . The next block serializes the data, again just a matter of three memcpy calls; we could avoid this by serializing directly at the WASM memory. The next block copies the serialized data into the WASM memory; I had to write at address 8 because writing at 0 resulted in panics in bytemuck (perhaps it doesn't expect data at a zero pointer?). We call the act function, which drops in to WASM and runs our logic, and then returns the address of the results. We snatch the results, interpret them, and use their values to clobber the diffs (another memcpy; a benefit of columnar layout).

All in all, subtle and fidgety, but not too terrible. We do have to get memory in to and out of the WASM runtime, and while we have a few more copies than that, I suspect we could get rid of them with some work. As with the WASM generating code itself, much of this is boilerplate that you wouldn't rewrite. With a bit more experience, it's possible it could be a provided method that only needs to know about the .wasm file and the returned type.

Shown and Told

That's all I've got at the moment. Next steps are to try this out in anger and see if there are any red flags from higher intensity workloads. Although the above produces the correct answers, it doesn't do much, and it isn't run on much data yet.

I'm pretty excited about how the container work is coming together. Again, props to Moritz for the foresight about containers and the patterns they unlock. I'm especially excited about how much positive reinforcement the lines of work have for each other.