From eb79e0a45d5743ab6b5594084f9c825ea5758ab8 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 17 Nov 2015 19:44:29 -0800 Subject: [PATCH] [SPARK-11583] [CORE] MapStatus Using RoaringBitmap More Properly This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty. This PR is based on #9661 (fix conflicts), see all of the comments at https://github.com/apache/spark/pull/9661 . Author: Kent Yao Author: Davies Liu Author: Charles Allen Closes #9746 from davies/roaring_mapstatus. --- .../apache/spark/scheduler/MapStatus.scala | 5 +-- .../spark/serializer/KryoSerializer.scala | 6 ++-- .../spark/scheduler/MapStatusSuite.scala | 31 +++++++++++++++++++ pom.xml | 2 +- 4 files changed, 38 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 1efce124c0..b2e9a97129 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus( /** * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, - * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap - * is compressed. + * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed * @param numNonEmptyBlocks the number of non-empty blocks @@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } + emptyBlocks.trim() + emptyBlocks.runOptimize() new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) } } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 1bcb3175a3..d5ba690ed0 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -17,7 +17,7 @@ package org.apache.spark.serializer -import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput} +import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable @@ -25,9 +25,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} import org.roaringbitmap.RoaringBitmap @@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ -import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils} /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index b8e466fab4..15c8de61b8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.serializer.JavaSerializer +import org.roaringbitmap.RoaringBitmap import scala.util.Random @@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite { val buf = ser.newInstance().serialize(status) ser.newInstance().deserialize[MapStatus](buf) } + + test("RoaringBitmap: runOptimize succeeded") { + val r = new RoaringBitmap + (1 to 200000).foreach(i => + if (i % 200 != 0) { + r.add(i) + } + ) + val size1 = r.getSizeInBytes + val success = r.runOptimize() + r.trim() + val size2 = r.getSizeInBytes + assert(size1 > size2) + assert(success) + } + + test("RoaringBitmap: runOptimize failed") { + val r = new RoaringBitmap + (1 to 200000).foreach(i => + if (i % 200 == 0) { + r.add(i) + } + ) + val size1 = r.getSizeInBytes + val success = r.runOptimize() + r.trim() + val size2 = r.getSizeInBytes + assert(size1 === size2) + assert(!success) + } } diff --git a/pom.xml b/pom.xml index 2a8a445057..940e2d8740 100644 --- a/pom.xml +++ b/pom.xml @@ -637,7 +637,7 @@ org.roaringbitmap RoaringBitmap - 0.4.5 + 0.5.11 commons-net