Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11583] [Core]MapStatus Using RoaringBitmap More Properly #9661

Closed
wants to merge 7 commits into from
Closed

[SPARK-11583] [Core]MapStatus Using RoaringBitmap More Properly #9661

wants to merge 7 commits into from

Conversation

yaooqinn
Copy link
Member

---------------Part I----------------

test cases BitSet v.s. RoaringBitmap

sparse case: for each task 10 blocks contains data, others dont

sc.makeRDD(1 to 40950, 4095).groupBy(x=>x).top(5)

dense case: for each task most block contains data, few dont

sc.makeRDD(1 to 16769025, 4095).groupBy(x=>x).top(5)
sc.makeRDD(1 to 16380000, 4095).groupBy(x=>x).top(5)

test tool

jmap -dump:format=b,file=heap.bin <pid>

test branches:

branch-1.5, master

memory usage

2.1 RoaringBitmap--sparse
Class Name | Objects | Shallow Heap | Retained Heap

org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 34,135,920

my explaination: 4095 * short[4095-10] =4095 * 16 * 4085 / 8 ≈ 34,135,920

2.2.1 RoaringBitmap--full

org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 360,360

my explaination:RoaringBitmap(0)

2.2.2 RoaringBitmap--very dense

org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 1,441,440

my explaination:4095 * short[95] = 4095 * 16 * 95 / 8 = 778, 050 (+ others = 1441440)

2.3 BitSet--sparse

org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 2,391,480

my explaination:4095 * 4095 =16,769,025 + (others = 2,391,480)

2.4 BitSet--full

org.apache.spark.scheduler.HighlyCompressedMapStatus| 4,095 | 131,040 | >= 2,391,480

my explaination:same as the above

conclusion

memory usage:( Retained Heap)
RoaringBitmap--full < RoaringBitmap--very dense < BitSet---full = BitSet--sparse < RoaringBitmap--sparse

In this specific case, for RoaringBitmap--sparse, use RoaringBitmap to mark non-empty blocks instead of empty ones. In this way, memory usage can stay low in all cases.

---------------Part II----------------

test cases

sparse case: 4085 empty --- S

sc.makeRDD(1 to 40950, 4095).groupBy(x=>x).top(5)

dense case: 95 empty ---- D

sc.makeRDD(1 to 16380000, 4095).groupBy(x=>x).top(5)

test result

1 without runOptimize or trim

case Class Name Objects Shallow Heap Retained Heap
S HighlyCompressedMapStatus 4,095 131,040 >= 34,135,920
D HighlyCompressedMapStatus 4,095 131,040 >= 1,441,440

2 with runOptimze and trim

case Class Name Objects Shallow Heap Retained Heap
S HighlyCompressedMapStatus 4,095 131,040 >= 687,960
D HighlyCompressedMapStatus 4,095 131,040 >= 687,960

3 with runOptinize and trim, also isSparse

case Class Name Objects Shallow Heap Retained Heap
S HighlyCompressedMapStatus 4,095 131,040 >= 687,960
D HighlyCompressedMapStatus 4,095 131,040 >= 687,960

conclusion

  1. with runOptimize, roaing saves a lot, especially in S
  2. after runOptimize, separating sparse case seems not necessary in this 4095bit test
  3. what if 100k, 200k bits? Is runOptimize OK? Or use my separating sparse case idea?
    ---- i don't have an env for such large tasks (HELP!)

---------------Part III----------------

For my questions in my last comment

continuous

scala> import org.roaringbitmap._
import org.roaringbitmap._

scala> val r = new RoaringBitmap()
r: org.roaringbitmap.RoaringBitmap = {}

scala> for (i <- 1 to 2000000)  r.add(i)

scala> r.runOptimize
res1: Boolean = true

scala> r.contains(2000000)
res2: Boolean = true
bits original size opitimized size
200K 33,056 328
2M 255,472 1,768

uncontinuous

scala> import org.roaringbitmap._
import org.roaringbitmap._

scala> val r = new RoaringBitmap()
r: org.roaringbitmap.RoaringBitmap = {}

scala> for (i <- 1 to 2000000) if(i%10==0) r.add(i)

scala> r.runOptimize
res1: Boolean = false

scala> r.trim

scala> r.contains(2000000)
res2: Boolean = true
bits original size opitimized size trimed size
200K 255,472 255,472 254,072
2M 2,516,688 2,516,688 2,516,272

conclusion

In uncontinuous, runOptimize failed and the size after r.trim neither became smaller as in continuous

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen
Copy link
Member

srowen commented Nov 12, 2015

The problem is that this brings back roaringbitmap, and one of the reasons we removed it was, separately, that kryo wasn't serializing it properly. You'd have to address that too.

What is your conclusion though? why is this better than your alternative/

@@ -174,6 +174,11 @@
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend using version 0.5.10 or better (http://central.maven.org/maven2/org/roaringbitmap/RoaringBitmap/0.5.10/).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact the version should be removed here -- it should be inherited from the parent pom

@squito
Copy link
Contributor

squito commented Nov 12, 2015

@srowen There is still some analysis left to be done, but I think there is a growing case that roaring is actually the right way to go, with some minor tweaks. So I'll do more reviewing here, assuming the analysis works out the way we expect, but this is still pending some final details from the discussion on the jira.

if (numNonEmptyBlocks < totalNumBlocks / 2) {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true)
} else {
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, isSparse = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(a) isSparse is not the right name and (b) we might not even need to bother with tracking empty vs. non-empty, after we see the results of calling runOptimize. In fact, if they are the same, I'd suggest we change this code to consistently track nonEmpty blocks, just for clarity.

@yaooqinn
Copy link
Member Author

@squito @lemire @srowen

test cases

sparse case: 4085 empty --- S

sc.makeRDD(1 to 40950, 4095).groupBy(x=>x).top(5)

dense case: 95 empty ---- D

sc.makeRDD(1 to 16380000, 4095).groupBy(x=>x).top(5)

test result

1 without runOptimize or trim

case Class Name Objects Shallow Heap Retained Heap
S HighlyCompressedMapStatus 4,095 131,040 >= 34,135,920
D HighlyCompressedMapStatus 4,095 131,040 >= 1,441,440

2 with runOptimze and trim

case Class Name Objects Shallow Heap Retained Heap
S HighlyCompressedMapStatus 4,095 131,040 >= 687,960
D HighlyCompressedMapStatus 4,095 131,040 >= 687,960

3 with runOptinize and trim, also isSparse

case Class Name Objects Shallow Heap Retained Heap
S HighlyCompressedMapStatus 4,095 131,040 >= 687,960
D HighlyCompressedMapStatus 4,095 131,040 >= 687,960

conclusion

  1. with runOptimize, roaing saves a lot, especially in S
  2. after runOptimize, separating sparse case seems not necessary in this 4095bit test
  3. what if 100k, 200k bits? Is runOptimize OK? Or use my separating sparse case idea?
    ---- i don't have an env for such large tasks (HELP!)

@yaooqinn
Copy link
Member Author

For my questions in my last comment

continuous

scala> import org.roaringbitmap._
import org.roaringbitmap._

scala> val r = new RoaringBitmap()
r: org.roaringbitmap.RoaringBitmap = {}

scala> for (i <- 1 to 2000000)  r.add(i)

scala> r.runOptimize
res1: Boolean = true

scala> r.contains(2000000)
res2: Boolean = true
bits original size opitimized size
200K 33,056 328
2M 255,472 1,768

uncontinuous

scala> import org.roaringbitmap._
import org.roaringbitmap._

scala> val r = new RoaringBitmap()
r: org.roaringbitmap.RoaringBitmap = {}

scala> for (i <- 1 to 2000000) if(i%10==0) r.add(i)

scala> r.runOptimize
res1: Boolean = false

scala> r.trim

scala> r.contains(2000000)
res2: Boolean = true
bits original size opitimized size trimed size
200K 255,472 255,472 254,072
2M 2,516,688 2,516,688 2,516,272

conclusion

In uncontinuous, runOptimize failed and the size after r.trim neither became smaller as in continuous

I guess my separating sparse case idea is still necessary for roaringbitmap to add lesser bits in case of runOptimize failure

Notice: Here isSparse refers to non-emtpy blocks / total block, not RoaringBitmap

if we use isSparse to ensure adding less bits to roaringbitmap

scala> import org.roaringbitmap._
import org.roaringbitmap._

scala> val r = new RoaringBitmap()
r: org.roaringbitmap.RoaringBitmap = {}

scala> r.add(1)

scala> r.add(200000)

scala> r.add(100000)

scala> r.runOptimize
res5: Boolean = false
bits original size opitimized size
200K(3) 256 256

Although runOptimize failed, the total size is still very small

@squito
Copy link
Contributor

squito commented Nov 13, 2015

@yaooqinn thanks for the additional analysis, but I would really prefer this is written up and attached to the jira for reference -- extended discussions in the PR are harder to keep track of later on. Also include the code and the additional cases, in particular the worst case analysis, from alternating bits. If you would prefer that someone else puts those details together (maybe me or @lemire), we can help, but it would be great if you could go that last extra mile :)

I don't think the analysis of the "uncontinuous" case and whether to track empty / non-empty blocks is correct. I wouldn't say that runOptimize "fails" in any of the cases -- there are sometimes when it doesn't decrease the size, but that doesn't mean that another approach would make the size significatnly smaller. The key question is: does the memory usage change if we track empty or non-empty blocks? The main case we should be worried about is when the bitset is very full. We might be better off inverting all the bits. Here's a test for that by taking a bitset of 200k elements, where all the elements are set except for every 1000th element. We compare the size (a) after adding all elements (b) after calling runOptimize (c) after flipping the bits. Also, just to be extra-sure, I also compare by directly adding the bits in their flipped state, just to make sure that there isn't a size difference from calling flip at the end:

import org.roaringbitmap._
val n = 2e5.toInt
val orig = new RoaringBitmap()
val flip1 = new RoaringBitmap()
(0 until n).foreach { idx =>
   if (idx % 1000 == 0) {
     flip1.add(idx)
   } else {
     orig.add(idx)
   }
}
val flip2 = orig.clone()
flip2.flip(0,n)
val origOptimized = orig.clone()
origOptimized.runOptimize()

scala> orig.getSizeInBytes
res1: Int = 31374

scala> flip1.getSizeInBytes
res2: Int = 432

scala> flip2.getSizeInBytes
res4: Int = 432

scala> origOptimized.getSizeInBytes
res6: Int = 844

This shows that the flipped version is a bit better than the the version w/ just runOptimize (as daniel had indicated in earlier comments). However, the difference is pretty small. IMO thats not worth worrying much about.

There are two other reasons we might want to use the flipped bits: (1) memory used while building the bitset and (2) time taken to build the bitset. However, both of those reasons are much less important. And in either case, we could only improve things in all cases by making two loops through the array -- one to count the non-zeros and decide which version is sparser, and then a second pass to create the bitset. This is way less important, so I wouldn't really worry about it now. Certainly if we just stick to always tracking the empty blocks its no worse than what we have now or what was in 1.5 and before.

import org.roaringbitmap.RoaringBitmap

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update your import rules to place scale package before third parties (as before this PR)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@davies
Copy link
Contributor

davies commented Nov 16, 2015

@yaooqinn This PR looks good to me overall. Since your patch basically revert #9243, I'd like to revert that first, then rebase this pr, to make the history more clear. We still need some changes to address SPARK-11016.

@srowen Does this sound good to you?

@lemire @squito @srowen Thanks to your all valuable discussion.

cc @rxin @JoshRosen

@rxin
Copy link
Contributor

rxin commented Nov 16, 2015

SGTM.

@gssiyankai
Copy link

@davies,
I've worked on the issue SPARK-11016.
The fix is available in this PR: yaooqinn#1

@srowen
Copy link
Member

srowen commented Nov 16, 2015

@davies I don't necessarily think we have to revert first. The original change did fix SPARK-11016 and, eh, sort of fixed one type of issue described by SPARK-11271 but really introduced a different related potential problem. This is a further improvement that happens to walk back part of the change.

Keep in mind the original PR is in branch 1.6 so I want to make sure we leave 1.6 in a good state if it's going to be cut soon.

This PR does not include a fix for SPARK-11016. Adding it would technically cause that problem to occur again. I think it would have to include some of the changes proposed by Charles Allen in the JIRA. Not hard.

@davies
Copy link
Contributor

davies commented Nov 16, 2015

@srowen If we are going to use RoaringBitmap, then #9243 does not really fix SPARK-11016. It introduce a performance regression for common workload (when only a few blocks are empty, the memory consumption of MapStatue is bigger than before), so I don't think branch 1.6 is in healthy state now.

Since SPARK-11016 is a bug fix, it's still OK to fix even 1.6 preview is published.

@srowen
Copy link
Member

srowen commented Nov 16, 2015

It did fix the problem, but only by making it a moot point. If roaring bitmap comes back then it just needs a real fix. I'm not arguing against this change of course just making sure it doesn't take a step backwards for two forwards.

@rxin
Copy link
Contributor

rxin commented Nov 16, 2015

I talked to @davies offline just now -- he thinks it'd be more clear for the diff of this patch if it doesn't need to include the "revert" of the last patch. I don't have a super strong opinion, but given he is willing to spend the time, I'd say he should make the call since nobody else feels super strongly about it.

@rxin
Copy link
Contributor

rxin commented Nov 16, 2015

And I think we should merge it into 1.6 too. I was busy when we were reviewing the previous patch but I did have some concerns about changing the data structure of a fundamental part of the shuffle code path. And that's why I never explicitly LGTMed the old patch.

@srowen
Copy link
Member

srowen commented Nov 16, 2015

Sound good on all counts as long as the serialization fix can be worked in too.

@davies
Copy link
Contributor

davies commented Nov 16, 2015

@yaooqinn #9243 is reverted, could you rebase this one master, hopefully we will merge it soon (also into 1.6).

@yaooqinn
Copy link
Member Author

$ git push https://github.com/yaooqinn/spark.git mapstatus-roaring:test
Counting objects: 5581, done.
Delta compression using up to 4 threads.
Compressing objects: 100% (1765/1765), done.
Writing objects: 100% (3957/3957), 908.68 KiB | 0 bytes/s, done.
Total 3957 (delta 1701), reused 3171 (delta 1016)
efrror: RPC failed; result=22, HTTP code = 403
atal: The remote end hung up unexpectedly
fatal: The remote end hung up unexpectedly
Everything up-to-date

after git rebase, i can't push it to my own repo, failed with 403
while my backup of this branch before still works.
Is there something wrong with master branch?

@davies
Copy link
Contributor

davies commented Nov 17, 2015

@yaooqinn I rebased and didnot have any problem, could you try git@github.com: ?

@davies
Copy link
Contributor

davies commented Nov 17, 2015

@yaooqinn I had fix the conflict for you (you still got the credit when merging it), also upgrade RoaringBitmap to 0.5.11.

@yaooqinn
Copy link
Member Author

@davies thanks.

Fix RoaringBitmap serialization with Kryo
@andrewor14
Copy link
Contributor

@yaooqinn can you close this now that we have #9746?

@yaooqinn yaooqinn closed this Nov 18, 2015
asfgit pushed a commit that referenced this pull request Nov 18, 2015
This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty.

This PR is based on #9661 (fix conflicts), see all of the comments at #9661 .

Author: Kent Yao <yaooqinn@hotmail.com>
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>

Closes #9746 from davies/roaring_mapstatus.

(cherry picked from commit e33053e)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
asfgit pushed a commit that referenced this pull request Nov 18, 2015
This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty.

This PR is based on #9661 (fix conflicts), see all of the comments at #9661 .

Author: Kent Yao <yaooqinn@hotmail.com>
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>

Closes #9746 from davies/roaring_mapstatus.
kiszk pushed a commit to kiszk/spark-gpu that referenced this pull request Dec 26, 2015
This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty.

This PR is based on #9661 (fix conflicts), see all of the comments at apache/spark#9661 .

Author: Kent Yao <yaooqinn@hotmail.com>
Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>

Closes #9746 from davies/roaring_mapstatus.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants