Skip to content

Commit

Permalink
[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringB…
Browse files Browse the repository at this point in the history
…itmap to reduce memory usage

JIRA: https://issues.apache.org/jira/browse/SPARK-11271

As reported in the JIRA ticket, when there are too many tasks, the memory usage of MapStatus will cause problem. Use BitSet instead of RoaringBitMap should be more efficient in memory usage.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #9243 from viirya/mapstatus-bitset.
  • Loading branch information
viirya authored and srowen committed Nov 2, 2015
1 parent e963070 commit e209fa2
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 33 deletions.
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -133,7 +132,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var emptyBlocks: BitSet,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.contains(reduceId)) {
if (emptyBlocks.get(reduceId)) {
0
} else {
avgSize
Expand All @@ -161,7 +160,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks = new BitSet
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
Expand All @@ -177,15 +176,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.add(i)
emptyBlocks.set(i)
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
Expand All @@ -39,7 +38,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.collection.{BitSet, CompactBuffer}

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down Expand Up @@ -363,12 +362,7 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[BitSet],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand Down
28 changes: 25 additions & 3 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.spark.util.collection

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.util.{Utils => UUtils}


/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
class BitSet(numBits: Int) extends Serializable {
class BitSet(private[this] var numBits: Int) extends Externalizable {

private val words = new Array[Long](bit2words(numBits))
private val numWords = words.length
private var words = new Array[Long](bit2words(numBits))
private def numWords = words.length

def this() = this(0)

/**
* Compute the capacity (number of bits) that can be represented
Expand Down Expand Up @@ -230,4 +237,19 @@ class BitSet(numBits: Int) extends Serializable {

/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
out.writeInt(numBits)
words.foreach(out.writeLong(_))
}

override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
numBits = in.readInt()
words = new Array[Long](bit2words(numBits))
var index = 0
while (index < words.length) {
words(index) = in.readLong()
index += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,6 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")

// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
// array (sparse) if they use less. So we just create two cases, one sparse and one dense.
// and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
// empty blocks

val ser = new KryoSerializer(conf).newInstance()
val denseBlockSizes = new Array[Long](5000)
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.util.collection

import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.util.{Utils => UUtils}

class BitSetSuite extends SparkFunSuite {

Expand Down Expand Up @@ -152,4 +155,50 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}

test("read and write externally") {
val tempDir = UUtils.createTempDir()
val outputFile = File.createTempFile("bits", null, tempDir)

val fos = new FileOutputStream(outputFile)
val oos = new ObjectOutputStream(fos)

// Create BitSet
val setBits = Seq(0, 9, 1, 10, 90, 96)
val bitset = new BitSet(100)

for (i <- 0 until 100) {
assert(!bitset.get(i))
}

setBits.foreach(i => bitset.set(i))

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset.get(i))
} else {
assert(!bitset.get(i))
}
}
assert(bitset.cardinality() === setBits.size)

bitset.writeExternal(oos)
oos.close()

val fis = new FileInputStream(outputFile)
val ois = new ObjectInputStream(fis)

// Read BitSet from the file
val bitset2 = new BitSet(0)
bitset2.readExternal(ois)

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset2.get(i))
} else {
assert(!bitset2.get(i))
}
}
assert(bitset2.cardinality() === setBits.size)
}
}
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down

0 comments on commit e209fa2

Please sign in to comment.