Skip to content

Commit

Permalink
Merge pull request #245 from twitter/Kryo3Upgrade
Browse files Browse the repository at this point in the history
Kryo 3.0.3 upgrade
  • Loading branch information
ianoc committed Feb 11, 2016
2 parents c37b59b + 1ec4dc3 commit 339e200
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 31 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
import scala.collection.JavaConverters._
import scalariform.formatter.preferences._

val kryoVersion = "2.21"
val kryoVersion = "3.0.3"
val bijectionVersion = "0.9.0"
val algebirdVersion = "0.12.0"

Expand All @@ -32,7 +32,7 @@ val sharedSettings = Project.defaultSettings ++ mimaDefaultSettings ++ scalarifo
libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.11.5" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
"com.esotericsoftware.kryo" % "kryo" % kryoVersion
"com.esotericsoftware" % "kryo-shaded" % kryoVersion
),

parallelExecution in Test := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void open(InputStream in) throws IOException {

public Object deserialize(Object o) throws IOException {
// TODO, we could share these buffers if we see that alloc is bottlenecking
byte[] bytes = new byte[inputStream.readInt()];
byte[] bytes = new byte[Varint.readUnsignedVarInt(inputStream)];
inputStream.readFully( bytes );
return kryoPool.fromBytes(bytes, klass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void serialize(Object o) throws IOException {
try {
st.writeObject(o);
// Copy from buffer to output stream.
outputStream.writeInt(st.numOfWrittenBytes());
Varint.writeUnsignedVarInt(st.numOfWrittenBytes(), outputStream);
st.writeOutputTo(outputStream);
}
finally {
Expand Down
171 changes: 171 additions & 0 deletions chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Taken from org.apache.mahout.math
* https://github.com/apache/mahout
*/

package com.twitter.chill.hadoop;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

/**
* <p>Encodes signed and unsigned values using a common variable-length
* scheme, found for example in
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
* Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
* but will use slightly more bytes to encode large values.</p>
*
* <p>Signed values are further encoded using so-called zig-zag encoding
* in order to make them "compatible" with variable-length encoding.</p>
*/
final class Varint {

private Varint() {
}

/**
* Encodes a value using the variable-length encoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
* Google Protocol Buffers</a>. It uses zig-zag encoding to efficiently
* encode signed values. If values are known to be nonnegative,
* {@link #writeUnsignedVarLong(long, java.io.DataOutputStream)} should be used.
*
* @param value value to encode
* @param out to write bytes to
* @throws java.io.IOException if {@link java.io.DataOutput} throws {@link java.io.IOException}
*/
public static void writeSignedVarLong(long value, DataOutputStream out) throws IOException {
// Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
writeUnsignedVarLong((value << 1) ^ (value >> 63), out);
}

/**
* Encodes a value using the variable-length encoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
* Google Protocol Buffers</a>. Zig-zag is not used, so input must not be negative.
* If values can be negative, use {@link #writeSignedVarLong(long, java.io.DataOutputStream)}
* instead. This method treats negative input as like a large unsigned value.
*
* @param value value to encode
* @param out to write bytes to
* @throws java.io.IOException if {@link java.io.DataOutputStream} throws {@link java.io.IOException}
*/
public static void writeUnsignedVarLong(long value, DataOutputStream out) throws IOException {
while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
out.writeByte(((int) value & 0x7F) | 0x80);
value >>>= 7;
}
out.writeByte((int) value & 0x7F);
}

/**
* @see #writeSignedVarLong(long, java.io.DataOutputStream)
*/
public static void writeSignedVarInt(int value, DataOutputStream out) throws IOException {
// Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
writeUnsignedVarInt((value << 1) ^ (value >> 31), out);
}

/**
* @see #writeUnsignedVarLong(long, java.io.DataOutputStream)
*/
public static void writeUnsignedVarInt(int value, DataOutputStream out) throws IOException {
while ((value & 0xFFFFFF80) != 0L) {
out.writeByte((value & 0x7F) | 0x80);
value >>>= 7;
}
out.writeByte(value & 0x7F);
}

/**
* @param in to read bytes from
* @return decode value
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @throws IllegalArgumentException if variable-length value does not terminate
* after 9 bytes have been read
* @see #writeSignedVarLong(long, java.io.DataOutputStream)
*/
public static long readSignedVarLong(DataInputStream in) throws IOException {
long raw = readUnsignedVarLong(in);
// This undoes the trick in writeSignedVarLong()
long temp = (((raw << 63) >> 63) ^ raw) >> 1;
// This extra step lets us deal with the largest signed values by treating
// negative results from read unsigned methods as like unsigned values
// Must re-flip the top bit if the original read value had it set.
return temp ^ (raw & (1L << 63));
}

/**
* @param in to read bytes from
* @return decode value
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @throws IllegalArgumentException if variable-length value does not terminate
* after 9 bytes have been read
* @see #writeUnsignedVarLong(long, java.io.DataOutputStream)
*/
public static long readUnsignedVarLong(DataInputStream in) throws IOException {
long value = 0L;
int i = 0;
long b;
while (((b = in.readByte()) & 0x80L) != 0) {
value |= (b & 0x7F) << i;
i += 7;
}
return value | (b << i);
}

/**
* @throws IllegalArgumentException if variable-length value does not terminate
* after 5 bytes have been read
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @see #readSignedVarLong(java.io.DataInputStream)
*/
public static int readSignedVarInt(DataInputStream in) throws IOException {
int raw = readUnsignedVarInt(in);
// This undoes the trick in writeSignedVarInt()
int temp = (((raw << 31) >> 31) ^ raw) >> 1;
// This extra step lets us deal with the largest signed values by treating
// negative results from read unsigned methods as like unsigned values.
// Must re-flip the top bit if the original read value had it set.
return temp ^ (raw & (1 << 31));
}

/**
* @throws IllegalArgumentException if variable-length value does not terminate
* after 5 bytes have been read
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @see #readUnsignedVarLong(java.io.DataInputStream)
*/
public static int readUnsignedVarInt(DataInputStream in) throws IOException {
int value = 0;
int i = 0;
int b;
while (((b = in.readByte()) & 0x80) != 0 && i < 42) {
value |= (b & 0x7F) << i;
i += 7;
}
if(i == 42) { // Over read!
throw new IllegalArgumentException("Read more than 5 bytes of data, must be invalid Var int");
}
return value | (b << i);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public ReflectingDefaultRegistrar(Class<T> cls, Class<? extends Serializer<?>> s
public Class<T> getRegisteredClass() { return klass; }
public Class<? extends Serializer<?>> getSerializerClass() { return serializerKlass; }
@Override
public void apply(Kryo k) { k.addDefaultSerializer(klass, k.newSerializer(serializerKlass, klass)); }
public void apply(Kryo k) { k.addDefaultSerializer(klass, serializerKlass); }

@Override
public int hashCode() { return klass.hashCode() ^ serializerKlass.hashCode(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.util.Util;

/** Use reflection to instantiate a serializer.
* Used when serializer classes are written to config files
Expand All @@ -34,8 +35,16 @@ public ReflectingRegistrar(Class<T> cls, Class<? extends Serializer<?>> ser) {
klass = cls;
serializerKlass = ser;
}

@Override
public void apply(Kryo k) { k.register(klass, k.newSerializer(serializerKlass, klass)); }
public void apply(Kryo k) {
try {
k.register(klass, serializerKlass.newInstance());
} catch (Exception ex) {
throw new IllegalArgumentException("Unable to create serializer \"" + serializerKlass.getName() + "\" for class: "
+ Util.className(klass), ex);
}
}
@Override
public int hashCode() { return klass.hashCode() ^ serializerKlass.hashCode(); }

Expand Down
4 changes: 2 additions & 2 deletions chill-java/src/main/java/com/twitter/chill/SerDeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public void clear() {
public void setInput(byte[] in, int offset, int count) { input.setBuffer(in, offset, count); }
public void setInput(InputStream in) { input.setInputStream(in); }

public int numOfWrittenBytes() { return output.total(); }
public int numOfReadBytes() { return input.total(); }
public int numOfWrittenBytes() { return (int)output.total(); }
public int numOfReadBytes() { return (int)input.total(); }

// Common operations:
public <T> T readObject(Class<T> cls) {
Expand Down
46 changes: 24 additions & 22 deletions chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.objenesis.strategy.InstantiatorStrategy

import _root_.java.lang.reflect.{ Constructor, Modifier }

import scala.util.{ Try, Success, Failure }

/*
* This is the base class of Kryo we use to fix specific scala
* related issues discovered (ideally, this should be fixed in Kryo)
Expand Down Expand Up @@ -90,37 +92,37 @@ class KryoBase extends Kryo {
/* Fixes the case where Kryo's reflectasm doesn't work, even though it claims to
* TODO this should be fixed in Kryo. When it is, remove this
*/
override def newInstantiator(cls: Class[_]) = {
override def newInstantiator(cls: Class[_]) = newTypedInstantiator[AnyRef](cls.asInstanceOf[Class[AnyRef]])

private[this] def newTypedInstantiator[T](cls: Class[T]) = {
import Instantiators._
newOrElse(cls,
List(reflectAsm _, normalJava _),
List(reflectAsm[T](_), normalJava[T](_)),
// Or fall back on the strategy:
tryStrategy(cls).newInstantiatorOf(cls))
}
}

object Instantiators {
// Go through the list and use the first that works
def newOrElse(cls: Class[_],
it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator]],
elsefn: => ObjectInstantiator): ObjectInstantiator = {
def newOrElse[T](cls: Class[T],
it: TraversableOnce[Class[T] => Try[ObjectInstantiator[T]]],
elsefn: => ObjectInstantiator[T]): ObjectInstantiator[T] = {
// Just go through and try each one,
it.map { fn =>
fn(cls) match {
case Left(x) => None // ignore the exception
case Right(obji) => Some(obji)

it
.flatMap { fn =>
fn(cls).toOption
}
}
.find { _.isDefined } // Find the first Some(x), returns Some(Some(x))
.flatMap { x => x } // flatten
.find (_ => true) // first element in traversable once (no headOption defined.)
.getOrElse(elsefn)
}

// Use call by name:
def forClass(t: Class[_])(fn: () => Any): ObjectInstantiator =
new ObjectInstantiator {
def forClass[T](t: Class[T])(fn: () => T): ObjectInstantiator[T] =
new ObjectInstantiator[T] {
override def newInstance() = {
try { fn().asInstanceOf[AnyRef] }
try { fn() }
catch {
case x: Exception => {
throw new KryoException("Error constructing instance of class: " + t.getName, x)
Expand All @@ -130,19 +132,19 @@ object Instantiators {
}

// This one tries reflectasm, which is a fast way of constructing an object
def reflectAsm(t: Class[_]): Either[Throwable, ObjectInstantiator] = {
def reflectAsm[T](t: Class[T]): Try[ObjectInstantiator[T]] = {
try {
val access = ConstructorAccess.get(t)
// Try it once, because this isn't always successful:
access.newInstance
// Okay, looks good:
Right(forClass(t) { () => access.newInstance() })
Success(forClass(t) { () => access.newInstance() })
} catch {
case x: Throwable => Left(x)
case x: Throwable => Failure(x)
}
}

def getConstructor(c: Class[_]): Constructor[_] = {
def getConstructor[T](c: Class[T]): Constructor[T] = {
try {
c.getConstructor()
} catch {
Expand All @@ -154,12 +156,12 @@ object Instantiators {
}
}

def normalJava(t: Class[_]): Either[Throwable, ObjectInstantiator] = {
def normalJava[T](t: Class[T]): Try[ObjectInstantiator[T]] = {
try {
val cons = getConstructor(t)
Right(forClass(t) { () => cons.newInstance() })
Success(forClass(t) { () => cons.newInstance() })
} catch {
case x: Throwable => Left(x)
case x: Throwable => Failure(x)
}
}
}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.7.5-SNAPSHOT"
version in ThisBuild := "0.8.0-SNAPSHOT"

0 comments on commit 339e200

Please sign in to comment.