From d59bfdb2703db627360ed88ba2b4fdf7d95f9963 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Thu, 13 Mar 2014 19:07:24 -0400 Subject: [PATCH 1/4] add chill-algebird project by copying AlgebirdSerializers from of scalding --- .../chill/algebird/AlgebirdSerializers.scala | 118 ++++++++++++++++++ project/Build.scala | 9 +- 2 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala new file mode 100644 index 00000000..467c99e7 --- /dev/null +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala @@ -0,0 +1,118 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.chill.algebird + +import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.{Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input, Output} + +import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog, + HyperLogLogMonoid, Moments, SpaceSaver, SSOne, SSMany} + +import scala.collection.mutable.{Map => MMap} +import scala.collection.immutable.SortedMap + +class AveragedValueSerializer extends KSerializer[AveragedValue] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : AveragedValue) { + out.writeLong(s.count, true) + out.writeDouble(s.value) + } + def read(kser : Kryo, in : Input, cls : Class[AveragedValue]) : AveragedValue = + AveragedValue(in.readLong(true), in.readDouble) +} + +class MomentsSerializer extends KSerializer[Moments] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : Moments) { + out.writeLong(s.m0, true) + out.writeDouble(s.m1) + out.writeDouble(s.m2) + out.writeDouble(s.m3) + out.writeDouble(s.m4) + } + def read(kser : Kryo, in : Input, cls : Class[Moments]) : Moments = { + Moments(in.readLong(true), + in.readDouble, + in.readDouble, + in.readDouble, + in.readDouble) + } +} + + +class DecayedValueSerializer extends KSerializer[DecayedValue] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : DecayedValue) { + out.writeDouble(s.value) + out.writeDouble(s.scaledTime) + } + def read(kser : Kryo, in : Input, cls : Class[DecayedValue]) : DecayedValue = + DecayedValue(in.readDouble, in.readDouble) +} + +class HLLSerializer extends KSerializer[HLL] { + setImmutable(true) + def write(kser: Kryo, out : Output, s : HLL) { + val bytes = HyperLogLog.toBytes(s) + out.writeInt(bytes.size, true) + out.writeBytes(bytes) + } + def read(kser : Kryo, in : Input, cls : Class[HLL]) : HLL = { + HyperLogLog.fromBytes(in.readBytes(in.readInt(true))) + } +} + +class HLLMonoidSerializer extends KSerializer[HyperLogLogMonoid] { + setImmutable(true) + val hllMonoids = MMap[Int,HyperLogLogMonoid]() + def write(kser: Kryo, out : Output, mon : HyperLogLogMonoid) { + out.writeInt(mon.bits, true) + } + def read(kser : Kryo, in : Input, cls : Class[HyperLogLogMonoid]) : HyperLogLogMonoid = { + val bits = in.readInt(true) + hllMonoids.getOrElseUpdate(bits, new HyperLogLogMonoid(bits)) + } +} + +// class SpaceSaverSerializer[T] extends KSerializer[SpaceSaver[T]] { +// setImmutable(true) +// def write(kser: Kryo, out: Output, s: SpaceSaver[T]) { +// s match { +// case (one: SSOne[_]) => { +// out.writeByte(1) +// out.writeInt(one.capacity, true) +// kser.writeClassAndObject(out, one.item) +// } +// case (many: SSMany[_]) => { +// out.writeByte(2) +// out.writeInt(many.capacity, true) +// kser.writeClassAndObject(out, many.counters) +// kser.writeClassAndObject(out, many.bucketsOption) +// } +// } +// } +// def read(kser: Kryo, in: Input, cls: Class[SpaceSaver[T]]): SpaceSaver[T] = { +// in.readByte match { +// case 1 => SSOne[T](in.readInt(true), kser.readClassAndObject(in).asInstanceOf[T]) +// case 2 => SSMany[T]( +// in.readInt(true), +// kser.readClassAndObject(in).asInstanceOf[Map[T, (Long, Long)]], +// kser.readClassAndObject(in).asInstanceOf[Option[SortedMap[Long, Set[T]]]] +// ) +// } +// } +// } diff --git a/project/Build.scala b/project/Build.scala index 701d2491..0859b572 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -93,7 +93,8 @@ object ChillBuild extends Build { chillThrift, chillProtobuf, chillAkka, - chillAvro + chillAvro, + chillAlgebird ) /** @@ -208,4 +209,10 @@ object ChillBuild extends Build { "com.twitter" %% "bijection-avro" % "0.6.2" ) ).dependsOn(chill,chillJava, chillBijection) + + lazy val chillAlgebird = module("algebird").settings( + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % "0.5.0" + ) + ).dependsOn(chill) } From c92d127e7ca2a7affc5a6e79edf8067b34401321 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 17 Mar 2014 12:31:35 -0400 Subject: [PATCH 2/4] add AlgebirdRegistrar that registers serializers --- .../chill/algebird/AlgebirdRegistrar.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala new file mode 100644 index 00000000..18b371f5 --- /dev/null +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala @@ -0,0 +1,40 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package com.twitter.chill.algebird + +import com.esotericsoftware.kryo.Kryo +import com.twitter.chill.{ RichKryo, IKryoRegistrar } + +import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog, + HyperLogLogMonoid, Moments, SpaceSaver} + +object AlgebirdRegistrar { + implicit def toRich(k: Kryo): RichKryo = new RichKryo(k) +} + +class AlgebirdRegistrar extends IKryoRegistrar { + import AlgebirdRegistrar._ + + def apply(k: Kryo) { + k + .forClass[AveragedValue](new AveragedValueSerializer) + .forClass[Moments](new MomentsSerializer) + .forClass[DecayedValue](new DecayedValueSerializer) + .forSubclass[HLL](new HLLSerializer) + .forClass[HyperLogLogMonoid](new HLLMonoidSerializer()) + //.forSubclass[SpaceSaver[Any]](new SpaceSaverSerializer[Any]) + } +} From 50baafd0870a7f013f2fbf0113c5df99f9f76af0 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 31 Mar 2014 20:22:38 -0400 Subject: [PATCH 3/4] AlgebirdRegistrar does all algebird registrations currently found in KryoHadoop --- .../chill/algebird/AlgebirdRegistrar.scala | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala index 18b371f5..07c4979b 100644 --- a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala @@ -16,25 +16,32 @@ limitations under the License. package com.twitter.chill.algebird import com.esotericsoftware.kryo.Kryo -import com.twitter.chill.{ RichKryo, IKryoRegistrar } +import com.esotericsoftware.kryo.serializers.FieldSerializer + +import com.twitter.chill.IKryoRegistrar import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog, - HyperLogLogMonoid, Moments, SpaceSaver} + HyperLogLogMonoid, Moments, SpaceSaver, DenseVector, SparseVector, AdaptiveVector} -object AlgebirdRegistrar { - implicit def toRich(k: Kryo): RichKryo = new RichKryo(k) -} class AlgebirdRegistrar extends IKryoRegistrar { - import AlgebirdRegistrar._ def apply(k: Kryo) { - k - .forClass[AveragedValue](new AveragedValueSerializer) - .forClass[Moments](new MomentsSerializer) - .forClass[DecayedValue](new DecayedValueSerializer) - .forSubclass[HLL](new HLLSerializer) - .forClass[HyperLogLogMonoid](new HLLMonoidSerializer()) - //.forSubclass[SpaceSaver[Any]](new SpaceSaverSerializer[Any]) + // Some of the monoids from Algebird that we use: + k.register(classOf[AveragedValue], new AveragedValueSerializer) + k.register(classOf[DecayedValue], new DecayedValueSerializer) + k.register(classOf[HyperLogLogMonoid], new HLLMonoidSerializer) + k.register(classOf[Moments], new MomentsSerializer) + k.addDefaultSerializer(classOf[HLL], new HLLSerializer) + //k.addDefaultSerializer(classOf[SpaceSaver[_]], new SpaceSaverSerializer[_]) + + /** AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer + * (which is its own bug), force using the fields serializer here + */ + k.register(classOf[DenseVector[_]], new FieldSerializer[DenseVector[_]](k, classOf[DenseVector[_]])) + + k.register(classOf[SparseVector[_]], new FieldSerializer[SparseVector[_]](k, classOf[SparseVector[_]])) + + k.addDefaultSerializer(classOf[AdaptiveVector[_]], classOf[FieldSerializer[_]]) } } From ede7955f935c479546abba7b78b0a5ec6c931b39 Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Mon, 28 Apr 2014 18:12:32 -0400 Subject: [PATCH 4/4] add some basic unit tests --- .../chill/algebird/AlgebirdRegistrar.scala | 1 - .../chill/algebird/AlgebirdSerializers.scala | 29 ------- .../algebird/AlgebirdSerializersSpec.scala | 81 +++++++++++++++++++ 3 files changed, 81 insertions(+), 30 deletions(-) create mode 100644 chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala index 07c4979b..e677e0e4 100644 --- a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdRegistrar.scala @@ -33,7 +33,6 @@ class AlgebirdRegistrar extends IKryoRegistrar { k.register(classOf[HyperLogLogMonoid], new HLLMonoidSerializer) k.register(classOf[Moments], new MomentsSerializer) k.addDefaultSerializer(classOf[HLL], new HLLSerializer) - //k.addDefaultSerializer(classOf[SpaceSaver[_]], new SpaceSaverSerializer[_]) /** AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer * (which is its own bug), force using the fields serializer here diff --git a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala index 467c99e7..d5743822 100644 --- a/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala +++ b/chill-algebird/src/main/scala/com/twitter/chill/algebird/AlgebirdSerializers.scala @@ -87,32 +87,3 @@ class HLLMonoidSerializer extends KSerializer[HyperLogLogMonoid] { hllMonoids.getOrElseUpdate(bits, new HyperLogLogMonoid(bits)) } } - -// class SpaceSaverSerializer[T] extends KSerializer[SpaceSaver[T]] { -// setImmutable(true) -// def write(kser: Kryo, out: Output, s: SpaceSaver[T]) { -// s match { -// case (one: SSOne[_]) => { -// out.writeByte(1) -// out.writeInt(one.capacity, true) -// kser.writeClassAndObject(out, one.item) -// } -// case (many: SSMany[_]) => { -// out.writeByte(2) -// out.writeInt(many.capacity, true) -// kser.writeClassAndObject(out, many.counters) -// kser.writeClassAndObject(out, many.bucketsOption) -// } -// } -// } -// def read(kser: Kryo, in: Input, cls: Class[SpaceSaver[T]]): SpaceSaver[T] = { -// in.readByte match { -// case 1 => SSOne[T](in.readInt(true), kser.readClassAndObject(in).asInstanceOf[T]) -// case 2 => SSMany[T]( -// in.readInt(true), -// kser.readClassAndObject(in).asInstanceOf[Map[T, (Long, Long)]], -// kser.readClassAndObject(in).asInstanceOf[Option[SortedMap[Long, Set[T]]]] -// ) -// } -// } -// } diff --git a/chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala b/chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala new file mode 100644 index 00000000..22168345 --- /dev/null +++ b/chill-algebird/src/test/scala/com/twitter/chill/algebird/AlgebirdSerializersSpec.scala @@ -0,0 +1,81 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.chill.algebird + +import com.twitter.chill.{ KSerializer, ScalaKryoInstantiator, KryoPool } +import com.twitter.algebird.{ AveragedValue, DecayedValue, HyperLogLogMonoid, MomentsGroup, AdaptiveVector } +import org.specs.Specification + +class AlgebirdSerializersSpec extends Specification { + val kryo = { + val inst = () => { + val newK = (new ScalaKryoInstantiator).newKryo + newK.setReferences(false) // typical in production environment (scalding, spark) + (new AlgebirdRegistrar).apply(newK) + newK + } + KryoPool.withByteArrayOutputStream(1, inst) + } + + def roundtrip[X](x: X) { + val bytes = kryo.toBytesWithClass(x) + //println("bytes size : " + bytes.size) + //println("bytes: " + new String(bytes, "UTF-8")) + val result = kryo.fromBytes(bytes).asInstanceOf[X] + result must_== x + } + + def roundtripNoEq[X](x: X)(f: X => Any) { + val bytes = kryo.toBytesWithClass(x) + val result = kryo.fromBytes(bytes).asInstanceOf[X] + f(result) must_== f(x) + } + + + "kryo with AlgebirdRegistrar" should { + "serialize and deserialize AveragedValue" in { + roundtrip(AveragedValue(10L, 123.45)) + } + + "serialize and deserialize DecayedValue" in { + roundtrip(DecayedValue.build(3.14, 20.2, 9.33)) + } + + "serialize and deserialize HyperLogLogMonoid" in { + roundtripNoEq(new HyperLogLogMonoid(12))(_.bits) + } + + "serialize and deserialize Moments" in { + roundtrip(MomentsGroup.zero) + } + + "serialize and deserialize HLL" in { + val sparse = new HyperLogLogMonoid(4).create(Array(-127.toByte)) + val dense = new HyperLogLogMonoid(4).batchCreate(Seq(-127, 100, 23, 44, 15, 96, 10).map(x => Array(x.toByte))) + roundtrip(sparse) + roundtrip(dense) + } + + "serialize and deserialize SparseVector and DenseVector" in { + val sparse = AdaptiveVector.fromVector(Vector(1,1,1,1,1,3), 1) + val dense = AdaptiveVector.fromVector(Vector(1,2,3,1,2,3), 1) + roundtrip(sparse) + roundtrip(dense) + } + + } +}