diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala new file mode 100644 index 00000000..e677e0e4 --- /dev/null +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala @@ -0,0 +1,46 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.chill.algebird + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.serializers.FieldSerializer + +import com.twitter.chill.IKryoRegistrar + +import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog, + HyperLogLogMonoid, Moments, SpaceSaver, DenseVector, SparseVector, AdaptiveVector} + + +class AlgebirdRegistrar extends IKryoRegistrar { + + def apply(k: Kryo) { + // Some of the monoids from Algebird that we use: + k.register(classOf[AveragedValue], new AveragedValueSerializer) + k.register(classOf[DecayedValue], new DecayedValueSerializer) + k.register(classOf[HyperLogLogMonoid], new HLLMonoidSerializer) + k.register(classOf[Moments], new MomentsSerializer) + k.addDefaultSerializer(classOf[HLL], new HLLSerializer) + + /** AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer + * (which is its own bug), force using the fields serializer here + */ + k.register(classOf[DenseVector[_]], new FieldSerializer[DenseVector[_]](k, classOf[DenseVector[_]])) + + k.register(classOf[SparseVector[_]], new FieldSerializer[SparseVector[_]](k, classOf[SparseVector[_]])) + + k.addDefaultSerializer(classOf[AdaptiveVector[_]], classOf[FieldSerializer[_]]) + } +} diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala new file mode 100644 index 00000000..d5743822 --- /dev/null +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala @@ -0,0 +1,89 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.chill.algebird + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.{Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input, Output} + +import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog, + HyperLogLogMonoid, Moments, SpaceSaver, SSOne, SSMany} + +import scala.collection.mutable.{Map => MMap} +import scala.collection.immutable.SortedMap + +class AveragedValueSerializer extends KSerializer[AveragedValue] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : AveragedValue) { + out.writeLong(s.count, true) + out.writeDouble(s.value) + } + def read(kser : Kryo, in : Input, cls : Class[AveragedValue]) : AveragedValue = + AveragedValue(in.readLong(true), in.readDouble) +} + +class MomentsSerializer extends KSerializer[Moments] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : Moments) { + out.writeLong(s.m0, true) + out.writeDouble(s.m1) + out.writeDouble(s.m2) + out.writeDouble(s.m3) + out.writeDouble(s.m4) + } + def read(kser : Kryo, in : Input, cls : Class[Moments]) : Moments = { + Moments(in.readLong(true), + in.readDouble, + in.readDouble, + in.readDouble, + in.readDouble) + } +} + + +class DecayedValueSerializer extends KSerializer[DecayedValue] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : DecayedValue) { + out.writeDouble(s.value) + out.writeDouble(s.scaledTime) + } + def read(kser : Kryo, in : Input, cls : Class[DecayedValue]) : DecayedValue = + DecayedValue(in.readDouble, in.readDouble) +} + +class HLLSerializer extends KSerializer[HLL] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : HLL) { + val bytes = HyperLogLog.toBytes(s) + out.writeInt(bytes.size, true) + out.writeBytes(bytes) + } + def read(kser : Kryo, in : Input, cls : Class[HLL]) : HLL = { + HyperLogLog.fromBytes(in.readBytes(in.readInt(true))) + } +} + +class HLLMonoidSerializer extends KSerializer[HyperLogLogMonoid] { + setImmutable(true) + val hllMonoids = MMap[Int,HyperLogLogMonoid]() + def write(kser: Kryo, out : Output, mon : HyperLogLogMonoid) { + out.writeInt(mon.bits, true) + } + def read(kser : Kryo, in : Input, cls : Class[HyperLogLogMonoid]) : HyperLogLogMonoid = { + val bits = in.readInt(true) + hllMonoids.getOrElseUpdate(bits, new HyperLogLogMonoid(bits)) + } +} diff --git a/chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala b/chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala new file mode 100644 index 00000000..22168345 --- /dev/null +++ b/chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala @@ -0,0 +1,81 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.chill.algebird + +import com.twitter.chill.{ KSerializer, ScalaKryoInstantiator, KryoPool } +import com.twitter.algebird.{ AveragedValue, DecayedValue, HyperLogLogMonoid, MomentsGroup, AdaptiveVector } +import org.specs.Specification + +class AlgebirdSerializersSpec extends Specification { + val kryo = { + val inst = () => { + val newK = (new ScalaKryoInstantiator).newKryo + newK.setReferences(false) // typical in production environment (scalding, spark) + (new AlgebirdRegistrar).apply(newK) + newK + } + KryoPool.withByteArrayOutputStream(1, inst) + } + + def roundtrip[X](x: X) { + val bytes = kryo.toBytesWithClass(x) + //println("bytes size : " + bytes.size) + //println("bytes: " + new String(bytes, "UTF-8")) + val result = kryo.fromBytes(bytes).asInstanceOf[X] + result must_== x + } + + def roundtripNoEq[X](x: X)(f: X => Any) { + val bytes = kryo.toBytesWithClass(x) + val result = kryo.fromBytes(bytes).asInstanceOf[X] + f(result) must_== f(x) + } + + + "kryo with AlgebirdRegistrar" should { + "serialize and deserialize AveragedValue" in { + roundtrip(AveragedValue(10L, 123.45)) + } + + "serialize and deserialize DecayedValue" in { + roundtrip(DecayedValue.build(3.14, 20.2, 9.33)) + } + + "serialize and deserialize HyperLogLogMonoid" in { + roundtripNoEq(new HyperLogLogMonoid(12))(_.bits) + } + + "serialize and deserialize Moments" in { + roundtrip(MomentsGroup.zero) + } + + "serialize and deserialize HLL" in { + val sparse = new HyperLogLogMonoid(4).create(Array(-127.toByte)) + val dense = new HyperLogLogMonoid(4).batchCreate(Seq(-127, 100, 23, 44, 15, 96, 10).map(x => Array(x.toByte))) + roundtrip(sparse) + roundtrip(dense) + } + + "serialize and deserialize SparseVector and DenseVector" in { + val sparse = AdaptiveVector.fromVector(Vector(1,1,1,1,1,3), 1) + val dense = AdaptiveVector.fromVector(Vector(1,2,3,1,2,3), 1) + roundtrip(sparse) + roundtrip(dense) + } + + } +} diff --git a/project/Build.scala b/project/Build.scala index 701d2491..0859b572 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -93,7 +93,8 @@ object ChillBuild extends Build { chillThrift, chillProtobuf, chillAkka, - chillAvro + chillAvro, + chillAlgebird ) /** @@ -208,4 +209,10 @@ object ChillBuild extends Build { "com.twitter" %% "bijection-avro" % "0.6.2" ) ).dependsOn(chill,chillJava, chillBijection) + + lazy val chillAlgebird = module("algebird").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % "0.5.0" + ) + ).dependsOn(chill) }