Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a fast and low-memory append-only map for shuffle operations #823

Closed
wants to merge 2 commits into from
Closed

Added a fast and low-memory append-only map for shuffle operations #823

wants to merge 2 commits into from

Conversation

mateiz
Copy link
Member

@mateiz mateiz commented Aug 14, 2013

This is an attempt to reduce the CPU cost and memory usage of shuffles by taking advantage of the properties their hashmaps need. In particular, the hashmaps there are append-only, and a common operation is updating a key's value based on the old value. The included AppendOnlyMap class uses open hashing to use less space than Java's (by not having a linked list per bucket), does not support deletes, and has a changeValue operation to update a key in place without following the hash chain twice.

This is just an experiment now because it remains to test it in real Spark apps, but in micro-benchmarks against java.util.HashMap, scala.collection.mutable.HashMap, this is 20-30% smaller and 10-40% faster depending on the number and type of keys. It's also noticeably faster than fastutil's Object2ObjectOpenHashMap.

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/575/

*/
private[spark]
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable {
if (!isPowerOf2(initialCapacity)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe change this to

require(isPowerOf2(initialCapacity), "Initial capacity must be power of 2")

and ditto for initialCapacity >= 1 << 30

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it'd be better to get the user to pass in an arbitrary initial size, and then we just round it up to the next power of 2.

@mateiz
Copy link
Member Author

mateiz commented Aug 14, 2013

FYI, for anyone interested, my benchmark vs fastutil and Java/Scala maps is at http://www.cs.berkeley.edu/~matei/maptest.tgz.

@mateiz
Copy link
Member Author

mateiz commented Aug 14, 2013

You can run it with, for example, sbt/sbt "run 10000 10000000". Vary the first number to change the number of keys -- things like cache locality matter more or less with different numbers of them.

val mask = (data.length / 2) - 1
var pos = rehash(key.hashCode) & mask
var i = 1
while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure we'll always break out of this while (true)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is what the quadratic probing method and power-of-2 table size guarantees (see http://en.wikipedia.org/wiki/Quadratic_probing)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was that incrementSize(), and consequently growTable(), is only called after this attempts to add an entry, meaning that there must be at least one free slot on method entry, else the loop never terminates. initialCapacity >= 1 is required, so a free slot exists on the first call; none would exist on subsequent calls only if we could increment curSize to capacity without growing the table. This could happen (modulo multiplication rounding errors) only if LOAD_FACTOR >= 1.

One could iterate only capacity times, then fall through to an error message, like in CLRS, but this would add a check per iteration.

I was thinking that this search logic could be extracted and shared with apply() and changeValue(). Don't know whether the Scala or JIT compiler would inline it for the same performance. But then I realized that the most likely case, which should be tested first, differs between putInto() and apply()/changeValue(); the former expects to find an empty slot, the latter expect to find the given key.

Last week you were talking of using double hashing...? I notice that fastutil seemed to have a performance issue with that and replaced it with linear probing.

@ryanlecompte
Copy link
Contributor

Exciting stuff! Looking forward to any shuffle benchmarks/comparisons in a real cluster with this change.

* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
private[spark]
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well [K <: AnyRef, V <: AnyRef] and avoid all of the downcasting to AnyRef.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that won't make it work for primitive types, which we allow in some clients that use this class (e.g. Aggregator and CoGroupedRDD). The casting is ugly but it seems necessary until we build some specialized classes for those types, which would be a later project.

data = newData
capacity = newCapacity
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just dealt with this by rounding capacity up to the next power of 2, as Reynold suggested

@AmplabJenkins
Copy link

Thank you for submitting this pull request.

All automated tests for this request have passed.

Refer to this link for build results: http://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/580/

var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(k) || curKey.eq(null) || curKey == k) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When curKey.eq(null), the key isn't present, and data(2 * pos + 1) is returned, which should be null. Clearer to say

else if (curKey.eq(null)) { return null.asInstanceOf[V] }

in parallel with the similar logic in changeValue().

@wannabeast
Copy link
Contributor

Are these the only two use cases for AppendOnlyMap, or should I expect others?

if (k.eq(null)) {
return nullValue
}
val mask = capacity - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't mask be an instance variable that you update in conjunction with capacity?

@wannabeast
Copy link
Contributor

It should be noted that the Scala library also has an OpenHashMap, but it's probably slower due to an extra level of indirection to the OpenEntry[Key, Value] elements of its internal Array.

@wannabeast
Copy link
Contributor

So far I've found no discernible difference in the performance of k-means with this change.

sparkkmeans2 performance

@mateiz
Copy link
Member Author

mateiz commented Aug 27, 2013

Ah, interesting, thanks for looking at it. K-means is actually fairly CPU-heavy due to code in the application itself, so it might be better to try another thing (e.g. the group-by test in spark-perf). But at least the good news is that it doesn't hurt performance either.

Anyway, I'm not going to merge this in 0.8.0 until we have more tests.

@mateiz
Copy link
Member Author

mateiz commented Oct 9, 2013

Closing this since it's now here: https://github.com/apache/incubator-spark/pull/44

@mateiz mateiz closed this Oct 9, 2013
xiajunluan pushed a commit to xiajunluan/spark that referenced this pull request May 30, 2014
…lasses

See https://issues.apache.org/jira/browse/SPARK-1879 -- builds with Hadoop2 and Hive ran out of PermGen space in spark-shell, when those things added up with the Scala compiler.

Note that users can still override it by setting their own Java options with this change. Their options will come later in the command string than the -XX:MaxPermSize=128m.

Author: Matei Zaharia <matei@databricks.com>

Closes mesos#823 from mateiz/spark-1879 and squashes the following commits:

6bc0ee8 [Matei Zaharia] Increase MaxPermSize to 128m since some of our builds have lots of classes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants