Skip to content

Commit

Permalink
[SPARK-11016] Move RoaringBitmap to explicit Kryo serializer
Browse files Browse the repository at this point in the history
Fix the serialization of RoaringBitmap with Kyro serializer

This PR came from metamx/spark#1, thanks to drcrallen

Author: Davies Liu <davies@databricks.com>
Author: Charles Allen <charles@allen-net.com>

Closes #9748 from davies/SPARK-11016.
  • Loading branch information
Davies Liu authored and kiszk committed Dec 26, 2015
1 parent 131cf69 commit 76b1f81
Showing 1 changed file with 55 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@

package org.apache.spark.serializer

import java.io.{EOFException, IOException, InputStream, OutputStream}
import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
import java.nio.ByteBuffer
import javax.annotation.Nullable

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
import org.roaringbitmap.RoaringBitmap

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
Expand Down Expand Up @@ -94,6 +94,9 @@ class KryoSerializer(conf: SparkConf)
for (cls <- KryoSerializer.toRegister) {
kryo.register(cls)
}
for ((cls, ser) <- KryoSerializer.toRegisterSerializer) {
kryo.register(cls, ser)
}

// For results returned by asJavaIterable. See JavaIterableWrapperSerializer.
kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer)
Expand Down Expand Up @@ -363,12 +366,6 @@ private[serializer] object KryoSerializer {
classOf[StorageLevel],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Byte]],
Expand All @@ -377,6 +374,55 @@ private[serializer] object KryoSerializer {
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)

private val toRegisterSerializer = Map[Class[_], KryoClassSerializer[_]](
classOf[RoaringBitmap] -> new KryoClassSerializer[RoaringBitmap]() {
override def write(kryo: Kryo, output: KryoOutput, bitmap: RoaringBitmap): Unit = {
bitmap.serialize(new KryoOutputDataOutputBridge(output))
}
override def read(kryo: Kryo, input: KryoInput, cls: Class[RoaringBitmap]): RoaringBitmap = {
val ret = new RoaringBitmap
ret.deserialize(new KryoInputDataInputBridge(input))
ret
}
}
)
}

private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
override def readLong(): Long = input.readLong()
override def readChar(): Char = input.readChar()
override def readFloat(): Float = input.readFloat()
override def readByte(): Byte = input.readByte()
override def readShort(): Short = input.readShort()
override def readUTF(): String = input.readString() // readString in kryo does utf8
override def readInt(): Int = input.readInt()
override def readUnsignedShort(): Int = input.readShortUnsigned()
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
override def readFully(b: Array[Byte]): Unit = input.read(b)
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
override def readLine(): String = throw new UnsupportedOperationException("readLine")
override def readBoolean(): Boolean = input.readBoolean()
override def readUnsignedByte(): Int = input.readByteUnsigned()
override def readDouble(): Double = input.readDouble()
}

private[serializer] class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput {
override def writeFloat(v: Float): Unit = output.writeFloat(v)
// There is no "readChars" counterpart, except maybe "readLine", which is not supported
override def writeChars(s: String): Unit = throw new UnsupportedOperationException("writeChars")
override def writeDouble(v: Double): Unit = output.writeDouble(v)
override def writeUTF(s: String): Unit = output.writeString(s) // writeString in kryo does UTF8
override def writeShort(v: Int): Unit = output.writeShort(v)
override def writeInt(v: Int): Unit = output.writeInt(v)
override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
override def write(b: Int): Unit = output.write(b)
override def write(b: Array[Byte]): Unit = output.write(b)
override def write(b: Array[Byte], off: Int, len: Int): Unit = output.write(b, off, len)
override def writeBytes(s: String): Unit = output.writeString(s)
override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
override def writeLong(v: Long): Unit = output.writeLong(v)
override def writeByte(v: Int): Unit = output.writeByte(v)
}

/**
Expand Down

0 comments on commit 76b1f81

Please sign in to comment.