Skip to content

Latest commit

 

History

History
432 lines (313 loc) · 25.8 KB

2018-11-20.md

File metadata and controls

432 lines (313 loc) · 25.8 KB

Strings (and beyond) in differential dataflow

As it has been a while, differential dataflow is a sweet programming framework which automatically updates its computations when you change its inputs. It goes really fast, scales, does twenty-seven things no other systems can do, blah blah.

In the course of using differential dataflow, you may find yourself using types that contain strings. In one recent, interesting case of program analysis, the input records look like tab-separated lines of text:

<sun.font.SunFontManager$14: void <init>()> <init>  ()  sun.font.SunFontManager$14  void    ()V 0
<sun.awt.image.VSyncedBSManager$SingleVSyncedBSMgr: void <init>()>  <init>  ()  sun.awt.image.VSyncedBSManager$SingleVSyncedBSMgr   void    ()V 0
<javax.swing.AbstractAction: boolean shouldReconfigure(java.beans.PropertyChangeEvent)> shouldReconfigure   (java.beans.PropertyChangeEvent)    javax.swing.AbstractAction  boolean (Ljava/beans/PropertyChangeEvent;)Z 1
<sun.font.SunFontManager$14: void <init>()> <init>  ()  sun.font.SunFontManager$14  void    ()V 0
...
<java.awt.geom.AffineTransform: void transform(double[],int,double[],int,int)>  transform   (double[],int,double[],int,int) java.awt.geom.AffineTransform   void    ([DI[DII)V  5
<java.awt.geom.AffineTransform: void transform(double[],int,double[],int,int)>  transform   (double[],int,double[],int,int) java.awt.geom.AffineTransform   void    ([DI[DII)V  5
<java.awt.geom.AffineTransform: void transform(double[],int,double[],int,int)>  transform   (double[],int,double[],int,int) java.awt.geom.AffineTransform   void    ([DI[DII)V  5
...
<org.gjt.sp.jedit.pluginmgr.MirrorList: void readXml()> readXml ()  org.gjt.sp.jedit.pluginmgr.MirrorList   void    ()V 0
<sun.nio.cs.StandardCharsets$Aliases: void init(java.lang.Object[])>    init    (java.lang.Object[])    sun.nio.cs.StandardCharsets$Aliases void    ([Ljava/lang/Object;)V  1
<sun.nio.cs.StandardCharsets$Aliases: void init(java.lang.Object[])>    init    (java.lang.Object[])    sun.nio.cs.StandardCharsets$Aliases void    ([Ljava/lang/Object;)V  1
<org.gjt.sp.jedit.pluginmgr.MirrorList: void readXml()> readXml ()  org.gjt.sp.jedit.pluginmgr.MirrorList   void    ()V 0
...

Barf.

These particular records have a type that is in essence an array of strings six-long, [String; 6], and we need to perform maps, filters, joins, grouping, and even fixed-point iteration, all using types like this (likely with other values of 6).

Strings do work!

Before going anywhere too exotic, like Fiji (sigh), the String type totally works in differential dataflow. You can go and do graph computations where the node type is String, or (Vec<String>, bool), or whatever crazy types you kids use to do your graph processing in Javascript (lol, I know... "types").

The String type works but it can be a bit inefficient when compared to something like usize, which is just a much simpler type to work with when we want to hash-distribute records, or sort records, or group them by keys. It is also much more efficient to clone a usize than a String---the latter means we must allocate some memory and most likely de-allocate it at some future point---and generally even using String types means we have some memory indirection along our critical path (farewell, branchless sort performance!).

Interning Strings

While String works, it might be interesting to perform something like string interning, in which each of the string instances is replaced by one representative, deduplicating the actual strings (allocations) being used.

Let's try something same same, but different: replacing each string with a unique integer identifier. We do lose the ability to do string-y operations, like substrings and converting to upper and lower case, but we can still do equality testing and hash-distribution and many of the core things we need to do in our data-parallel compute lifestyle.

In the context of the program analysis project linked above, we can still execute Datalog programs, as they only really care about exact matches between fields.

Single-threaded, string replacement would be super easy:

/// Ensure `string` exists in `map`, return unique identifier.
fn intern_in(string: String, map: &mut HashMap<String, usize>) -> usize {
    let len = map.len();
    *map.entry(string)
        .or_insert(len)
}

Whenever we intern a string, we either return its existing identifier or add it with a new distinct identifier and then return that.

This skips a few fun issues like do we ever remove interned strings (not here), how to we look up strings from their identifiers (a second map, I guess), can we do this in parallel, and maybe a few other issues.

We aren't going to stick with this approach, but it is good to see that it isn't all that painful.

Interning Strings IN DIFFERENTIAL DATAFLOW

Let's write a fairly simple computation that will intern strings using differential dataflow.

Can't we just use the above single-threaded fragment? Sure, sure. I did that in the project linked up above. It works, it's totally fine, but it just feels a bit gross and non-robust. For example, we have to do all of the data loading on one thread, because we can't intern the strings in parallel. If the set of strings changes dramatically, we keep all of the pre-existing strings around grotting up the place. Also c'mon, this is going to be neat and you might learn something. :D

As a first, guess, couldn't we just intern strings by hashing each of them and using the hash as the integer identifier? Almost, but there could be collisions. How about, if there are collisions, we pick one winner and re-hash the others until they don't collide?

Bingo!

/// Assigns a unique identifier to each element of `collection`.
fn intern<G, D>(collection: &Collection<G, D>) -> Collection<G, (D, usize)>
where
    G: Scope,
    G::Timestamp: Lattice,
    D: Data,
{
    collection
        // initialize each string at "round" zero.
        .map(|record| (0, record))
        .iterate(|temp| {
            // propose a candidate hash from (round, string),
            // group by hash and pick at most one winner,
            //       defer non-winners to the next round.
            temp.map(|pair| (pair.hashed(), pair))
                .group(|_hash, input, output| {
                    // first (round, string) wins!
                    output.push((input[0].0.clone(), 1));
                    // if any losers, increment their rounds.
                    for ((round, record),_count) in input[1..].iter() {
                        output.push(((*round+1, record.clone()), 1));
                    }
                })
                .map(|(_hash, pair)| pair)
        })
        .map(|pair| (pair.1, pair.hashed()))
}

This works great. Each string in collection gets a fairly random hash in round zero, and if there are any collisions we pick a winner (the earliest round, breaking ties lexicographically by string) and promote the losers to the next round. We don't really expect many collisions, and we really don't expect any one string to repeatedly collide with other strings (assuming our .hashed() method is worth anything).

What happens when we have a change in our input strings?

Not very much, which is great news! There was so little interaction of strings, that there are similarly sparse changes to their interactions. Perhaps we could add a string that evicts a winner, but the cascade should be quite limited. Perhaps when we remove a winner we end up finding the string or two it collided with and re-naming them winner. But, most of the time we add or remove a string and its hash value as an identifier.

This is all incrementalized, distributed, streaming, self-compacting, all that good technology that went into doing high-throughput graph computation, now assigning distinct numbers for your strings.

Using interned strings

Way back up there we had some horrible [String; 6] folks that reminded us about Java and that evil still walks the Earth. How do we wire together a string interning fragment (like just above) with our stream of Java six-tuples?

I'll show you a pretty naive version, and I'll sketch a more sophisticated version for your homework!

The most naive way to do things is to i. extract the candidate strings to intern, and then ii. repeatedly join the six-tuples with the interned strings to extract identifiers for each coordinate at a time.

// Each six-tuple offers six strings to intern.
let interned = intern(&six_tuples.flat_map(|x| x));

let interned_six =
six_tuples
    // map each to (next_key, ([hashes], [strings])).
    .map(|x| (x[0], ([], x[1..6])))
    .join_map(interned, |_k,h,(hs,ss)| (ss[0], ((hs,h), ss[1..5])))
    .join_map(interned, |_k,h,(hs,ss)| (ss[0], ((hs,h), ss[1..4])))
    .join_map(interned, |_k,h,(hs,ss)| (ss[0], ((hs,h), ss[1..3])))
    .join_map(interned, |_k,h,(hs,ss)| (ss[0], ((hs,h), ss[1..2])))
    .join_map(interned, |_k,h,(hs,ss)| (ss[0], ((hs,h), ss[1..1])))
    .join_map(interned, |_k,h,(hs,ss)| (hs,h));

Ok, now all those join_map closures aren't actually legit Rust, but I hope you get the gist about what is going on. Six times, we extract the first remaining string and use join_map to find us an integer hash value to replace it, and keep the remaining strings.

Arrangements

As a quick aside, let's recall that differential dataflow has a concept of an "Arrangement", which is an indexed representation of a collection that can be re-used without additional cost. In the code above, each time we use interned we re-build an index of the data, and we maintain the six copies as interned changes.

We can skip this by arranging interned once and re-using the arrangement, like so:

// Each six-tuple offers six strings to intern.
let interned = intern(&six_tuples.flat_map(|x| x)).arrange_by_key();

let interned_six =
six_tuples
    // map each to (next_key, ([hashes], [strings])).
    .map(|x| (x[0], ([], x[1..6])))
    .join_core(interned, |_k,h,(hs,ss)| Some((ss[0], ((hs,h), ss[1..5]))))
    .join_core(interned, |_k,h,(hs,ss)| Some((ss[0], ((hs,h), ss[1..4]))))
    .join_core(interned, |_k,h,(hs,ss)| Some((ss[0], ((hs,h), ss[1..3]))))
    .join_core(interned, |_k,h,(hs,ss)| Some((ss[0], ((hs,h), ss[1..2]))))
    .join_core(interned, |_k,h,(hs,ss)| Some((ss[0], ((hs,h), ss[1..1]))))
    .join_core(interned, |_k,h,(hs,ss)| Some((hs,h)));

Technically a few other things changed. We use join_core rather than join_map because that is the method that consumes arranged inputs (and it wants something like an iterator for output, even though we have just output for each match). But, this will maintain just one copy of interned and re-use it six times!


Intermission

It was at this point that I proposed some homework. The industrious reader was invited to take the recently introduce sweet join tech to try and perform those joins above without maintaining six copies of the the six_tuples collection, each with progressively fewer strings and more integers.

Let's return to our story, and see where the author goes next.


Update (2018-11-21)

Hey I tried that homework up above. I bet you did too, right? Pretty hard, huh?

Despite being pretty sure that the dogsdogsdogs project has positive implications, I think you still need to keep around the original [String; 6] data somewhere, because if any string identifiers change you'll need to push the changed [usize; 6] records.

Of course, if we naively indexed each [String; 6] keyed by each of the six fields we are actually doing worse than above, recording each string six times. So, that sucks. How do we avoid keeping six copies (or more, for numbers larger than six) of the input data around indexed different ways?

How do real databases do this? Can we not just steal from them?

Yes we can.

Row Identifiers IN DIFFERENTIAL DATAFLOW

In a great many database systems and schemas, when you introduce new records to a relation that does not otherwise have a primary key, you automatically get an auto-incremented "row identifier" added in that acts as a primary key. This row identifier lets you speak about the rows without slogging all of their data around all over the place. When performing joins and such a database system can just track the row identifiers involved (and perhaps the subset of attributes) rather than the potentially kilobytes of per-record payload.

Let's do that IN DIFFERENTIAL DATAFLOW!

Specifically, let's assign row identifiers in differential dataflow so that we can more efficiently intern strings, but at the same time we'll come up with a great way to implement relational joins. The first one is probably more important, right?

Let's imagine we start with some collection of string six-tuples:

let horror: Collection<_, [String; 6]>;

and what we want most is to assign unique identifiers to each six-tuple.

// Use the string interning method up above.
let horror_ids = intern(&horror);

Yeah, it turns out we already invented it. Oops. !#$!ing generic programming.

Decomposing records

We are now sitting on records of the form ([String; 6], usize). What comes next?

Next, let's break apart these six-tuples into six collections, one for each attribute, each containing elements of the type (usize, String) corresponding to the record identifier and the value for its associated attribute.

The first step is to break down our ([String; 6], usize) collection to one containing six (usize, usize, String) records for each input record, announcing each attribute and row identifier, and the value. Then we'll partition into six collections using the attribute identifier:

let attr_row_value =
horror_ids
    .flat_map(|(horror, id)|
        horror
            .into_iter()    // six-element iteration.
            .enumerate()    // pre-pend with attr_id.
            .map(|(attr, value)| (attr, (id, value)))
    );

// Partition collections by attr_id (timely magic).
let attr_collections: Vec<Collection<G, (usize, String)>> =
attr_row_value
    .inner                  // inner timely stream
    .partition(6, |((a,(i,v)),t,r)| (a, ((i,v),t,r)))
    .into_iter()
    .map(|stream| stream.as_collection())
    .collect::<Vec<_>>();

We now have six collections. One for each attribute, each containing records like (row_id, value). That was amazingly fun, of course, but why did we do this?

There are at least two reasons that I can think of, string interning and making joins go faster, let's talk each of them out.

String interning

Obviously this is why we are here. We are all about the strings.

We can independently intern the strings in each of the six collections, like so:

let attr_interned =
attr_collections
    .iter()
    .map(|collection|
        collection
            .map(|(row_id, value)| (value, row_id))
            .join_core(&interned, |_, row_id, hash| Some((row_id, hash)))
    .collect::<Vec<_>>();

Did you notice I used join_core, which is the version that doesn't re-index interned and instead re-uses the common arrangement? :D

This computation produces six collections each containing (row_id, hash) pairs, and we can use those more-or-less as if they were (row_id, value) pairs, almost. At the same time, it maintains each attribute value once, in indexed form for the first input to join_core. This is way better than six times.

Technically we have an additional copy of each attribute value back where we determined record identifiers (that subcomputation stashes each of the argument records).

Making joins better

Our decomposition of our relation into the attributes allows us to do a bit of a la carte columnar join processing, which can be fun.

Let's imagine that instead of just forming each of the (row_id, value) collections we also arrange each of them, resulting in a maintained collection indexed by row_id.

let attr_arrangeds =
attr_collections
    .iter()
    .map(|collection| collection.arrange_by_key())
    .collect::<Vec<_>>();

We can now use these arrangements to re-assemble arbitrary subsets of attributes in the record, with surprisingly small cost.

let attrs_034 =
attr_arrangeds[0]
    .join_core(&attr_arrangeds[3], |r,a0,a3| Some((r,(a0,a3))))
    .join_core(&attr_arrangeds[4], |r,(a0,a3),a4| Some((r,(a0,a3,a4))));

The "surprisingly small" cost here (debatable) is because each of the collection arguments to join_core is pre-arranged, meaning sorted by the key (row_id). When join_core gets to work, it does one sequential swing through each arrangement, picking up the attributes we are looking for.

The attentive reader might complain that the first input to the second join_core operator is not arranged, and will cost, and this is where we break out the dogsdogsdogs delta queries. They use the arranged collections but don't need to maintain intermediate collections (because they respond to change is e.g. attr_arrangeds[4] with a different dataflow).

We can also independently filter the attributes, and then join. This allows us to tap dance through only the relevant records in each of the relations. For example, consider:

let attrs_034 =
attr_collections[0]
    .filter(|(row_id, value)| value.len() > 4)
    .join_core(&attr_arrangeds[3], |r,a0,a3| Some((r,(a0,a3))))
    .join_core(&attr_arrangeds[4], |r,(a0,a3),a4| Some((r,(a0,a3,a4))));

The arrangement going in to the first join_core has been reduced, but the row identifiers are still in order. All off differential dataflow's join operators are implemented to seek rather than scan between keys, and so a sparse set of filtered keys will allow the operator to zip forward through the keys in attr_arrangeds[3], rather than scanning all of them.

This works less well once we've rocked things down to interned string identifiers, but .. well maybe this section should have come before that section. Still working on that.


Performance Update (2018-11-23)

It turns out this computation led me down an interesting performance rabbit hole, and I've just returned with fabulous and wonderful stories!

Let's talk through the intern function up above, and what it does. We will tease out a few performance issues, ones that you might not have even realized were in your power to fix (one still isn't), and see things get a fair bit better by doing so.

Baseline performance

The core of intern's iterative computation takes the full set of (round, string) inputs we have and groups them by their candidate hash value, and produces winners and losers as output (where losers get their round incremented):

    temp.map(|pair| (pair.hashed(), pair))
        .group(|_hash, input, output| {
            output.push((input[0].0.clone(), 1));
            for ((round, record),_count) in input[1..].iter() {
                output.push(((*round+1, record.clone()), 1));
            }
        })
        .map(|(_hash, pair)| pair)

This is all well and good. Let's see how long it takes to intern 1,000,000 integers (not strings, sorry; but this is easier):

Echidnatron% cargo run --release --example test -- 10000000
    Finished release [optimized + debuginfo] target(s) in 0.09s
     Running `target/release/examples/test 10000000`
12.505527384s
^C
Echidnatron%

Hard to know if this is particularly good (it's about 1.25 microseconds per identifier). The process's memory use stabilizes at 1.31GB which is around 131 bytes per identifier. That is a bit larger than we might expect.

It turns out that our program is doing something silly, at least silly for this type of computation. Our group operator receives inputs with largely distinct hash values, and for these inputs it simply reproduces them as output. To stay sane, the group operator maintains both its input and output collections, so it maintains two nearly identical copies.

Seems pretty natural, though. Unless we want to invent a new group implementation, which we (I) don't.

Switching to differences

Our computation determines the "new" pairs (round, string) by incrementing the round field of a relatively small number of records. What if instead we asked group to produce only the changes?

What does that mean? Isn't differential dataflow already incrementalized? Yes, yes, but I mean something different. What if we compute "how should we change temp to avoid collisions?", and then concatenate the results with temp:

    temp.map(|pair| (pair.hashed(), pair))
        .group(|_hash, input, output| {
            // output.push((input[0].0.clone(), 1));
            for ((round, record),_count) in input[1..].iter() {
                output.push(((*round+0, record.clone()),-1));
                output.push(((*round+1, record.clone()), 1));
            }
        })
        .map(|(_hash, pair)| pair)
        .concat(&temp)

It is a bit weird to do this ourselves, rather than asking differential dataflow to do it for us, but we are in the special situation where the group operator has the same input as output, and we want to do input v. output differences, rather than time-by-time differences which differential does well.

In some weird interpretation we are doing "spatial differencing" rather than "temporal differencing", relating different locations in our dataflow. Never believe that just because someone has fancy names for things means they understand what is going on.

For example, watch this:

Echidnatron% cargo run --release --example test -- 10000000
    Finished release [optimized + debuginfo] target(s) in 0.08s
     Running `target/release/examples/test 10000000`
15.039220586s
^C
Echidnatron%

That took longer. What a pity. On the other hand, it stabilizes at a memory footprint of just 578MB, which is about half the size of our prior attempt, which is a success! That is some 57.8 bytes per record, which is basically what we should expect given our data structures (seven eight byte records, including the hash, record, two offsets, a two-dimensional timestamp, and a difference).

So what is going on that makes it go slower, despite using half the memory?

Optimizing iteration

The short version (there is a longer version; it involved cursing) is:

Our iterate method develops all (round, string) pairs, and so at some point (the beginning) we need to bring in these initial values, apply one step of our logic, and then subtract the initial values. We have to do this even when nothing changes, because we have to check that nothing has changed. When nothing changes this difference cancels and we don't have anything to write down (yay!) but we had to do the work to determine this (oh well...).

To work around this, we will need to write a different computation. A computation that doesn't iteratively develop the full collection of (round, string) pairs, but perhaps instead iteratively develops the changes needed to apply to the initial collection (rather than changes to temp).

Here is how I wrote that:

/// Assigns a unique identifier to each element of `collection`.
fn intern<G, D>(collection: &Collection<G, D>) -> Collection<G, (D, usize)>
where
    G: Scope,
    G::Timestamp: Lattice,
    D: Data,
{
    // capture the initial (round, record) pairs.
    let init = collection.map(|record| (0, record));

    init.filter(|_| false)
        .iterate(|diff|
            // Bring `init` into scope and add to `diff`.
            // Then determine changes relative to `init`,
            // which means subtracting records at round 0.
            init.enter(&diff.scope())
                .concat(&diff)
                .map(|pair| (pair.hashed(), pair))
                .group(|_hash, input, output| {
                    // keep round-positive records as changes.
                    let ((round, record), count) = &input[0];
                    if *round > 0 {
                        output.push(((0, record.clone()), -*count));
                        output.push(((*round, record.clone()), *count));
                    }
                    // if any losers, increment their rounds.
                    for ((round, record), count) in input[1..].iter() {
                        // Subtract a *round zero* instance of the record.
                        output.push(((0, record.clone()), -*count));
                        output.push(((*round+1, record.clone()), *count));
                    }
                })
                .map(|(_hash, pair)| pair)
        )
        .concat(&init)  // concat init *outside* the loop.
        .map(|pair| { let hash = pair.hashed(); (pair.1, hash) })
}

This computation develops what is very likely a small collection diff, although it does use init which may be not small. Fortunately, it uses init only once, in the input to group. Further, as it does not start from init (our filter fixes that) we don't end up wasting any time subtracting it back off.

Echidnatron% cargo run --release --example test -- 10000000
    Finished release [optimized + debuginfo] target(s) in 0.08s
     Running `target/release/examples/test 10000000`
6.519266834s
^C
Echidnatron%

This works out to 652 nanoseconds per record (sorting by hash, I think), and the memory footprint ends up at 621MB which is approximately what we had up above. Maybe a touch more for reasons I don't currently understand, but good enough for us for now.

We were very fortunate with this computation because it was possible to reason about changes relative to init, as we knew that the only thing that happens to round counters is that they are incremented from zero. Our second implementation, slower but with a smaller footprint, is probably the more reasonable style of computation to expect people to be able to write for more general computations.