Skip to content

Commit

Permalink
Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of …
Browse files Browse the repository at this point in the history
…RoaringBitmap to reduce memory usage"

This reverts commit e209fa2.
  • Loading branch information
davies committed Nov 16, 2015
1 parent 985b38d commit 3c02508
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 82 deletions.
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: BitSet,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
extends MapStatus with Externalizable {

Expand All @@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
if (emptyBlocks.get(reduceId)) {
if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
Expand All @@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new BitSet
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
}
Expand All @@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
// From a compression standpoint, it shouldn't matter whether we track empty or non-empty
// blocks. From a performance standpoint, we benefit from tracking empty blocks because
// we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
val emptyBlocks = new BitSet(totalNumBlocks)
while (i < totalNumBlocks) {
var size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
totalSize += size
} else {
emptyBlocks.set(i)
emptyBlocks.add(i)
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
Expand All @@ -38,7 +39,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.{BitSet, CompactBuffer}
import org.apache.spark.util.collection.CompactBuffer

/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
Expand Down Expand Up @@ -362,7 +363,12 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[BitSet],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand Down
28 changes: 3 additions & 25 deletions core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,14 @@

package org.apache.spark.util.collection

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.apache.spark.util.{Utils => UUtils}


/**
* A simple, fixed-size bit set implementation. This implementation is fast because it avoids
* safety/bound checking.
*/
class BitSet(private[this] var numBits: Int) extends Externalizable {
class BitSet(numBits: Int) extends Serializable {

private var words = new Array[Long](bit2words(numBits))
private def numWords = words.length

def this() = this(0)
private val words = new Array[Long](bit2words(numBits))
private val numWords = words.length

/**
* Compute the capacity (number of bits) that can be represented
Expand Down Expand Up @@ -237,19 +230,4 @@ class BitSet(private[this] var numBits: Int) extends Externalizable {

/** Return the number of longs it would take to hold numBits. */
private def bit2words(numBits: Int) = ((numBits - 1) >> 6) + 1

override def writeExternal(out: ObjectOutput): Unit = UUtils.tryOrIOException {
out.writeInt(numBits)
words.foreach(out.writeLong(_))
}

override def readExternal(in: ObjectInput): Unit = UUtils.tryOrIOException {
numBits = in.readInt()
words = new Array[Long](bit2words(numBits))
var index = 0
while (index < words.length) {
words(index) = in.readLong()
index += 1
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")

// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
// array (sparse) if they use less. So we just create two cases, one sparse and one dense.
// and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
// empty blocks

val ser = new KryoSerializer(conf).newInstance()
val denseBlockSizes = new Array[Long](5000)
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.util.collection

import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

import org.apache.spark.SparkFunSuite
import org.apache.spark.util.{Utils => UUtils}

class BitSetSuite extends SparkFunSuite {

Expand Down Expand Up @@ -155,50 +152,4 @@ class BitSetSuite extends SparkFunSuite {
assert(bitsetDiff.nextSetBit(85) === 85)
assert(bitsetDiff.nextSetBit(86) === -1)
}

test("read and write externally") {
val tempDir = UUtils.createTempDir()
val outputFile = File.createTempFile("bits", null, tempDir)

val fos = new FileOutputStream(outputFile)
val oos = new ObjectOutputStream(fos)

// Create BitSet
val setBits = Seq(0, 9, 1, 10, 90, 96)
val bitset = new BitSet(100)

for (i <- 0 until 100) {
assert(!bitset.get(i))
}

setBits.foreach(i => bitset.set(i))

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset.get(i))
} else {
assert(!bitset.get(i))
}
}
assert(bitset.cardinality() === setBits.size)

bitset.writeExternal(oos)
oos.close()

val fis = new FileInputStream(outputFile)
val ois = new ObjectInputStream(fis)

// Read BitSet from the file
val bitset2 = new BitSet(0)
bitset2.readExternal(ois)

for (i <- 0 until 100) {
if (setBits.contains(i)) {
assert(bitset2.get(i))
} else {
assert(!bitset2.get(i))
}
}
assert(bitset2.cardinality() === setBits.size)
}
}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.4.5</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
Expand Down

0 comments on commit 3c02508

Please sign in to comment.