Skip to content

Latest commit

 

History

History
303 lines (204 loc) · 25.7 KB

2017-07-27.md

File metadata and controls

303 lines (204 loc) · 25.7 KB

Memory management for big data

In this post we are going to talk about something that lies at the core of most performance issues in big data systems. Not only am I going to claim that Rust makes your life better than whatever JVM stack you were using before, we are actually going to try and go beyond what Rust does for you, playing catch-up with some research systems in the area.

Are you ready for excitement? Let's talk about managing memory.

This is probably the moment where you sigh, tricked, as if this were some "Seven Hadoop performance secrets Oracle doesn't want you to know" post. I certainly remember back when I couldn't imagine anything less exciting, especially since "memory management" was almost always the reason my otherwise well-considered program went slow as sap.

More recently, I've taken an interest in understand how computers work and whether I can bend them to my will. Can I make the computer do the thing I'm pretty sure it should be able to do, independent of whichever screw-head wrote the layer of code I have to write against. "Managed languages" and Rust are on opposite ends of the spectrum here, with the former building up lots of infrastructure to "help" but which often unintentionally "sucks". The latter, Rust, does a pretty solid job of getting out of your way, but of course provides some default behavior when you need it.

It's going to be pretty good default behavior, but we are going to try something more awesomer.

Some motivation

Imagine you are tasked with doing some big data computation stuff, and instead of just needing to do grep or wordcount or maybe even graph_processing, you need to work with real data types. Let's take an example I stole from Erik Tryzelaar (whose patrilineal heritage is clearly "galactic overlord"), who in turn borrowed (conquered?) it from a CloudFlare Go serialization benchmark.

impl Log {
    pub fn new() -> Log {
        Log {
            timestamp: 2837513946597,
            zone_id: 123456,
            zone_plan: ZonePlan::FREE,
            http: Http {
                protocol: HttpProtocol::HTTP11,
                status: 200,
                host_status: 503,
                up_status: 520,
                method: HttpMethod::GET,
                content_type: "text/html".to_owned(),
                user_agent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.146 Safari/537.36".to_owned(),
                referer: "https://www.cloudflare.com/".to_owned(),
                request_uri: "/cdn-cgi/trace".to_owned(),
            },
            origin: Origin {
                ip: "1.2.3.4".to_owned(),
                port: 8000,
                hostname: "www.example.com".to_owned(),
                protocol: OriginProtocol::HTTPS,
            },
            country: Country::US,
            cache_status: CacheStatus::Hit,
            server_ip: "192.168.1.1".to_owned(),
            server_name: "metal.cloudflare.com".to_owned(),
            remote_ip: "10.1.2.3".to_owned(),
            bytes_dlv: 123456,
            ray_id: "10c73629cce30078-LAX".to_owned(),
        }
    }
}

This is some type Log that has a bunch of "real" data in it. Strings, and enums, and .. strings mostly. We could make it more complicated (lists of strings, lists of log entries, lists of galactic dominions ..), but this should be good enough to make our point.

Imagine we have a great deal of these log entries, and our computation needs to process them. We accept a stream of log entries on the one hand and on the other do crazy things like maybe look up entries by timestamp so we can see what happened at some moment in time that might explain untoward events in our system. In short, we need lots of these things in memory in our system, and we'd like their structural complexity (pointers and such) not to screw things up horribly.

How would existing systems screw things up, currently?

Garbage collection and big data

Many of you are probably familiar with a garbage collected language, from Java, to Python, to Go. These languages provide an experience in which you are not required to think about allocation of memory, for example when you create a new log entry. Nor (and this is the important part) are you required to think about the de-allocation of memory, whenever you finish that last bit of code that could possibly look at the memory you allocated for your log entry (and its many strings).

Most of the garbage collection your languages use is what is called a "tracing garbage collector", which means that the system maintains and often revisits all of the live allocations it can reach from the "roots" of your program (think: variables on your stack and what you can reach from them, transitively).

Tracing garbage collectors are accurate (if you can reach something, best to keep it; if you can't reach a thing, no reason to keep it). They can also be pretty efficient for some patterns of programs, in particular those programs in which most data are ephemeral: tracing collectors do work proportional to the number of reachable allocations, and if there aren't many of those at any point, great news!

Awkwardly, most big data computations are defined by .. having lots of data. And not just briefly having lots of data. Any interesting stream processing system works by maintaining a large amount of historical state that it regularly consults and updates based on the data moving past it. Such a system does not regularly flush out its state, unless it has run out of memory and is now crashing (my current theory: "fault-tolerance" in systems built on managed languages is just a super-primitive form of garbage collection; prove me wrong).

When we build Naiad many years back, while we got our system up and running surprisingly quickly (I was surprised, anyhow), we then spent forever-and-a-half optimizing the performance, and there was a great deal about changing our system and programs so that the garbage collector wouldn't have a fit. For example, if you used String anywhere in your program, meaning we maintain a large indexed collection of Strings for you, the C# garbage collector would vanish for seconds at a time (once: 39 seconds) trying to recall just which of the gigabytes of allocations were still live.

Of course, garbage collectors are always getting better, is something you often hear from people who make garbage collectors for a living. Another thing we heard (from the C# performance team) was "if your program's allocations aren't 90% garbage, you might want to rethink your program". Some rethinking happened; they were right about that.*

Anyhow, I don't know which GC is currently the best, and it may be that in the past few years they have discovered secret performance technology that makes them wonderful, but the current trend is that folks on GC'd platforms (the JVM mostly) are moving their data off of the managed heap, and handling their memory allocations manually. We won't talk about them any more.

*: Several MSR colleagues did that rethinking and ended up with a HotOS paper about a system they called Broom, whose position was that system-wide tracing garbage collection was just going to be an issue in big data systems, and perhaps there were (specific) better approaches to use. They proposed using regions, something that we will get back to a bit later in the post. If you'd like to take a break and read the paper, great! Our goal here is to do the things they recommend, but without building a new system.

Ownership and big data

Rust has a different approach to memory management, in that one of its defining features is that it provides automatic memory management without a tracing garbage collector. Automatic memory management just means that you don't need to manually allocate and free things, but as we will see it doesn't mean "all the properties of a tracing garbage collector", for better or for worse.

In Rust, all bindings (of data to a variable or field) are uniquely owned, in the sense that whenever that binding goes away (e.g. when overwritten, when the containing scope ends, etc) we know that there are no other owners of the data, and we can (and should!) clean up it and any of its resources. Like, right away! We don't have to wait to scan the heap or whatever it is the tracing garbage collectors do.

Consider our log type up above. When we construct a new one (which is what the Log::new() method does), we are going to allocate up some space for the ten strings it holds. We'll use it for a while, and we might even mutate some of the strings, maybe grow them a bit, that sort of thing. But, at some point we use the log entry for the last time, and discard our handle to it. At this moment we can safely go in and de-allocate whatever is pointed at by those ten pointers that were initially allocated for our strings.

There are a bunch of issues with this approach, most prominently that if we want a copy of the data for some reason, maybe to hand to another bit of code without giving up our copy, we would need to clone the data. That means another ten new allocations, assuming we stick to the discipline that whenever you discard a Log you should de-allocate what memory it owns.

Rust has various solutions for this problem, including reference counting (Rc and Arc) which allow shared ownership, as well as "borrowing" (the & and &mut sigils) which is Rust's way of saying "you can see / use my copy, but give it back when done, rather than de-allocating it, please".

Borrowing is the sort of thing that stresses out many folks new to Rust, mostly (I believe) because they haven't had to think about it before. If you aren't familiar, it boils down to grabbing a raw pointer and convincing the compiler that the referrent will be stable for at least as long as your pointer is around. The "convincing" part is what is stressful, but if you get it right then your program can share out references to the existing allocation and not continually re-/de-allocate memory as it does.

The main restriction that borrowing introduces, insisting on the stability of the original data (it should not be mutated, moved, de-allocated), has seemed to rarely be a problem for me in big data computations. Although the data is big, and while the set of data are always changing, each individual datum is not itself changing. If you were running wordcount, you would expect a stream of well-defined words as inputs, not Strings whose contents change each time you look at them. Although your core logic will need to do more interesting mutations, the data plane of dataflow systems has seemed to me to be well accommodated by borrowing.

Regions and big data

Regions are a third way of doing memory management, and while you may not have heard of them, you've probably used regions before.

Region based memory management works from the premise that some collection of allocations are going to be around for the same amount of time, and if so it makes sense to locate them all in the same small pile of memory, so that when we are done we can just drop that whole allocation, no questions asked. In the case of Log up above, we could stash all ten of the allocations in the same place, using just one allocation, and then do just one de-allocation when we are finished.

This is very similar to another thing that computers do: serialization. When one computer needs to send data to another computer, it is forced to bundle up the data (and whatever that data can reach), put it all in a contiguous hunk of memory as raw bytes (not pointers), and send those bytes to its friend. Its friend receives the bytes, and re-interprets them as meaningful data using whatever agreed upon schema it shares with its friend.

Serialization is a lot like user-mode regions. The serialized data live in one allocation and can be created, cloned, and dropped with no more than one allocation (and often fewer, if we plan to serialize a batch of data together).

Serialization gets a bit of a bad rap, because in a lot of systems (e.g. Java and C# for sure) it is dog slow. Also, people often serialize to and from text based formats like JSON. Don't let their miserable performance turn you off from serialization. It can be faster than using the native representations and operations.

Say what?

Serialization can be faster than asking your language to make a new instance of an object for you. That Log::new() method up above? Way slower than serializing a Log into a fresh Vec<u8> and then deserializing the contents back out. From Abomonation's serde benchmark, which compares these methods on Log:

Echidnatron% cargo bench
    Finished release [optimized] target(s) in 0.0 secs
    Running target/release/deps/abomonation-ef6b60a9e882069a
running 3 tests
test log_decode        ... bench:           8 ns/iter (+/- 2) = 65500 MB/s
test log_encode        ... bench:          74 ns/iter (+/- 12) = 7081 MB/s
test log_new           ... bench:         371 ns/iter (+/- 39)

test result: ok. 0 passed; 0 failed; 0 ignored; 3 measured; 0 filtered out

Echidnatron%

Serializing a Log to existing memory is about five times faster than allocating a new one.

How could this possibly be? Don't we need to end up with the same thing that new() gives us, and if so how could bypassing the language's preferred mechanisms possibly be better? The answer lies in borrowing, and our implicit (now explicit, sorry) willingness to work only with borrowed data.

Restricting ourselves to work only with borrowed data imposes limitations. We essentially get read-only access to the data, which means we can't take our Log and rewrite the hostname field. And if we want an owned copy of the hostname field to stash somewhere else we will need to clone it. But, the system wasn't planning on handing out ownership of the record fields anyhow; we were going to have to clone things if we wanted that. But this way, we don't have to clone things if we don't need an owned version.

Let's imagine for now that we are comfortable with using borrowed data, or at least deferring clone and any allocations until we actually need owned data. What does a serialization library like Abomonation do under the hood? Let's look at the encode and decode method signatures first:

unsafe fn encode<T: Abomonation>(typed: &T, bytes: &mut Vec<u8>);
unsafe fn decode<T: Abomonation>(bytes: &mut [u8]) -> Option<(&T, &mut [u8])>;

These two methods are foremost unsafe. They muck around with internals that Rust doesn't normally want you touching. They may have undefined behavior, because the Rust folks are still working out what constitutes undefined behavior. At the moment Rust's UB includes memcpy of structs with padding bytes, which Abomonation does. I'm guessing this won't eventually be UB, but there are other, more horrible things going on here.

From its signature, encode takes a reference to some type T as well as a writeable reference to a vector of bytes. These are the source data and destination vector, respectively. Once done, bytes will be extended by a bunch of bytes representing typed. To a first approximation, encode calls memcpy on all the data it can reach from typed, following pointers and everything.

From its signature, decode is a complicated beast. Its only argument is bytes, a writeable reference to a slice of bytes. This means we can change the bytes themselves, but we can't change the underlying allocation itself. There is an additional input in the type parameter T which tells us what type we are hoping to find, without supplying an instance of T as an argument. Instead, what we get as output is either None (if decoding failed) or Some wrapped around a pair of reference to a T (the &T) and whatever bytes remain having burned some on the decoding. To a first approximation decode just fixes up the pointers to be addresses internal to the allocation.

We will defer judgement about the propriety of this scheme to some future post.

So, regions. Fast serialization is roughly equivalent to user-mode regions allocation, and spares us a great deal of overhead that comes with using the default allocation strategies. We feel a bit sad that we weren't allocating directly into the region, and we will need to through how we fall back to owned data when we need to inter-op with code that expects it (worst case: do the allocations we just skipped, but only as needed; better case coming up).

But let's look a bit more at serialization, and Abomonation specifically, to see what sorts of things we can easily do.

Serialization performance

This is partly me kicking the tires, and partly talking through the sorts of things you should be able to do with serialized data. I'm writing this without having run (or even written) most of the benchmarks, so it may be that not everything turns out to be a great idea. Let's find out!

We've already talked a bit about what numbers you could get with a type like Log when you compare new() with whatever encode() and decode() do. Let's take a peek at the benchmarking code and see.

Allocating and de-allocating

The first benchmark repeatedly allocates new log entries. Rust has a handy built-in benchmarking infrastructure I used for all of this. You write code like so:

#[bench]
fn log_new(b: &mut Bencher) {
    b.iter(|| Log::new());
}

and Rust's benchmarking harness will run this a lot and produce output like so:

test log_new           ... bench:         371 ns/iter (+/- 39)

Apparently it takes roughly 371 nanoseconds to allocate and de-allocate (don't forget the second!) a Log. There are ten strings in each Log, which means ten allocations and de-allocations. There are also going to be ten-ish calls to memcpy to load up the contents of each of the strings, and probably another call for the struct itself (which is 288 bytes, even without the allocations).

This isn't any better if we try and use clone() to get a copy of an existing Log:

#[bench]
fn log_clone(b: &mut Bencher) {
    let log = Log::new();
    b.iter(|| log.clone());
}

which takes just a little longer:

test log_clone         ... bench:         382 ns/iter (+/- 30)

Encoding and decoding without allocation

The encode and decode benchmarks serialize into and deserialize from existing memory.

#[bench]
fn log_encode(b: &mut Bencher) {
    let log = Log::new();
    let mut bytes = vec![];
    unsafe { encode(&log, &mut bytes); }
    b.bytes = bytes.len() as u64;
    b.iter(|| {
        bytes.clear();
        unsafe { encode(&log, &mut bytes); }
        test::black_box(&bytes);
    });
}

and

#[bench]
fn log_decode(b: &mut Bencher) {
    let log = Log::new();
    let mut bytes = vec![];
    unsafe { encode(&log, &mut bytes); }
    b.bytes = bytes.len() as u64;
    b.iter(|| {
        test::black_box(unsafe { decode::<Log>(&mut bytes) });
    });
}

The test::black_box is how we indicate to the benchmarking infrastructure that we are not allowed to optimize out the argument. We did this in the above examples by returning the result of the work, but the results here have borrowed lifetimes in them, and cannot be returned.

These two methods do roughly the minimal amount of work possible, and move pretty briskly:

test log_decode        ... bench:           8 ns/iter (+/- 0) = 65500 MB/s
test log_encode        ... bench:          77 ns/iter (+/- 55) = 6805 MB/s

The numbers are slightly different from up above because I ran things again. Pretty exciting news.

Encoding and decoding with allocations

The two methods above re-use existing memory, which does mean that we can't hand the result to another person in the same way we could with a new or cloned Log. To do that, we could also look at a version that clones encoded data into a new allocation and there decodes it (to correct pointers), resulting in something it could hand out:

#[bench]
fn log_decode_clone(b: &mut Bencher) {
    let log = Log::new();
    let mut bytes = vec![];
    unsafe { encode(&log, &mut bytes); }
    b.bytes = bytes.len() as u64;
    b.iter(|| {
        let mut bytes = bytes.clone();
        test::black_box(unsafe { decode::<Log>(&mut bytes) });
    });
}

This chats with the allocator to get a new Vec<u8>, but it only does so once, and for the appropriate amount of memory for a full encoded Log. Additionally, when it copies memory it does so once, rather than eleven distinct times from possibly disjoint regions.

test log_encode_clone           ... bench:          51 ns/iter (+/- 8) = 10274 MB/s

Of course, since the bytes arrays we allocate are untyped (or, are all Vec<u8> typed), we shouldn't be allocating and de-allocating, we should just recycle them.

Inspecting the data

What if we just want to look at our decoded data and confirm that it equals what we started from?

#[bench]
fn log_decode_assert(b: &mut Bencher) {
    let log = Log::new();
    let mut bytes = vec![];
    unsafe { encode(&log, &mut bytes); }
    b.bytes = bytes.len() as u64;
    b.iter(|| {
        assert!(unsafe { decode::<Log>(&mut bytes) }.unwrap().0 == &log);
    });
}

This code does a fair bit more work than decode, in that in addition to fixing up pointers it actually examines the serialized text (decode doesn't touch the text; just the pointers to it).

test log_decode_assert ... bench:         106 ns/iter (+/- 24) = 4943 MB/s

This is now taking a bit more time, both because we are looking at more data and because we may have to chase down allocated data from the source Log. It goes ever so slightly faster if we compare our decoded data to another decoded &Log. I'd wager this goes up if we need to track down memory in ten different locations (at ~50ns each), though whether we would end up in that situation is less clear.

Moving data around

What about a more interesting operation, a data shuffle? We'll just take a sequence of these Log things and round robin them between two destination vectors, treat the two vectors as our source, and then repeat forever. We'll do this with one million log entries, using both raw and serialized representations:

test log_shuffle_raw            ... bench:  61,937,093 ns/iter (+/- 6,782,729) = 8460 MB/s
test log_shuffle_serialized     ... bench: 166,930,100 ns/iter (+/- 22,544,995) = 3139 MB/s

This looks pretty great for the raw representation, which only needs about 61ns per element. What is going on is that we only shuffled the struct data, not the pointers, which are happily sitting wherever they were in memory before we started. This transfers ownership, meaning that access to the original copy is lost. To get owned copies we would need to call clone() as we go, doing some allocations and slowing things down a bit.

test log_shuffle_raw_clone      ... bench: 1,046,903,138 ns/iter (+/- 246,788,658) = 500 MB/s

That's a bit different. The previous version was really just memcpying 288 byte regions around (the struct itself) and now that it has to (i) chat with the allocator and (ii) actually look at the allocated data (which post-shuffles is scattered all over memory), well it takes more time, doesn't it?

The serialized version up above is naively going from a decoded &Log and just encoding it, without exploiting the fact that the data are serialized in the first place (and we could have just copied all the bytes). It is worth understanding how we perform even when not in that setting, because we may often get references without direct access to information about the serialized size (e.g. a field in a &Log). Also, the length of the serialized data is not written down anywhere; we discover it indirectly in the deserialization process.

For the sake of experiment, we could mock this up as deserializing each element to discover its length, then ignoring everything about the result other than the number of bytes read, and just copying those bytes over without using encode. Actually, that should work fine any time we start with serialized data, it's just a bit grotty.

test log_shuffle_serialized_opt ... bench: 112,091,319 ns/iter (+/- 3,612,521) = 4674 MB/s

This is still about twice as slow as just moving the 288 bytes of the struct around, which makes sense as the serialized representation is 524 bytes, about twice as long. However, if we aren't able to take ownership of the original allocations, the serialized forms are substantially faster than invoking clone() and working with any non-region type of allocator.

Comments

I'm pretty keen to try out serialized representations for internal differential dataflow data structures. Right now, even just using simple types like String, you see a lot of time spent chatting with the allocator, getting new allocations and releasing old allocations. When you do something complicated like merge a few GB of immutable owned data, you end up doing all sorts of clones, and you'd really rather not.

Plus, and more on this at a future point, if everything is just a Vec<u8> you can blat it out to disk really fast. Or memory map it back in, if something goes horribly wrong (e.g. perhaps you are running a fault tolerance experiment and chose to manually kill your timely workers).

Recycling

Next time: how to quickly turn those &Logs into actual Logs, for long enough to work with them. Hint: recycler.