Skip to content

Commit

Permalink
Use Spark BitSet instead of RoaringBitmap to reduce memory usage.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 23, 2015
1 parent cdea017 commit 392975d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 10 deletions.
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -133,7 +132,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.contains(reduceId)) {
if (emptyBlocks.get(reduceId)) {
0
} else {
avgSize
Expand All @@ -161,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks = new BitSet(0)
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
Expand All @@ -177,15 +176,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.add(i)
emptyBlocks.set(i)
}
i += 1
}
Expand Down
26 changes: 23 additions & 3 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

package org.apache.spark.util.collection

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.util.{Utils => UUtils}


/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
class BitSet(numBits: Int) extends Serializable {
class BitSet(private[this] var numBits: Int) extends Externalizable {

private val words = new Array[Long](bit2words(numBits))
private val numWords = words.length
private var words = new Array[Long](bit2words(numBits))
private def numWords = words.length

/**
* Compute the capacity (number of bits) that can be represented
Expand Down Expand Up @@ -230,4 +235,19 @@ class BitSet(numBits: Int) extends Serializable {

/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
out.writeInt(numBits)
words.foreach(out.writeLong(_))
}

override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
numBits = in.readInt()
words = new Array[Long](bit2words(numBits))
var index = 0
while (index < words.length) {
words(index) = in.readLong()
index += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.util.collection

import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.util.{Utils => UUtils}

class BitSetSuite extends SparkFunSuite {

Expand Down Expand Up @@ -152,4 +155,50 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}

test("read and write externally") {
val tempDir = UUtils.createTempDir()
val outputFile = File.createTempFile("bits", null, tempDir)

val fos = new FileOutputStream(outputFile)
val oos = new ObjectOutputStream(fos)

// Create BitSet
val setBits = Seq(0, 9, 1, 10, 90, 96)
val bitset = new BitSet(100)

for (i <- 0 until 100) {
assert(!bitset.get(i))
}

setBits.foreach(i => bitset.set(i))

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset.get(i))
} else {
assert(!bitset.get(i))
}
}
assert(bitset.cardinality() === setBits.size)

bitset.writeExternal(oos)
oos.close()

val fis = new FileInputStream(outputFile)
val ois = new ObjectInputStream(fis)

// Read BitSet from the file
val bitset2 = new BitSet(0)
bitset2.readExternal(ois)

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset2.get(i))
} else {
assert(!bitset2.get(i))
}
}
assert(bitset2.cardinality() === setBits.size)
}
}

0 comments on commit 392975d

Please sign in to comment.