Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kryo 3.0.3 upgrade #245

Merged
merged 20 commits into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void open(InputStream in) throws IOException {

public Object deserialize(Object o) throws IOException {
// TODO, we could share these buffers if we see that alloc is bottlenecking
byte[] bytes = new byte[inputStream.readInt()];
byte[] bytes = new byte[(int) Varint.readUnsignedVarLong(inputStream)];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no int? should we throw here if the long > Int.MaxValue

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the code is below. Why not unsigned varint?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flowed out of the kryo api for bytes written returning a long... at the entry point to that i goto an int now, so everything else just ends up being integers

inputStream.readFully( bytes );
return kryoPool.fromBytes(bytes, klass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void serialize(Object o) throws IOException {
try {
st.writeObject(o);
// Copy from buffer to output stream.
outputStream.writeInt(st.numOfWrittenBytes());
Varint.writeUnsignedVarLong(st.numOfWrittenBytes(), outputStream);
st.writeOutputTo(outputStream);
}
finally {
Expand Down
168 changes: 168 additions & 0 deletions chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Taken from org.apache.mahout.math
* https://github.com/apache/mahout
*/

package com.twitter.chill.hadoop;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

/**
* <p>Encodes signed and unsigned values using a common variable-length
* scheme, found for example in
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
* Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values,
* but will use slightly more bytes to encode large values.</p>
*
* <p>Signed values are further encoded using so-called zig-zag encoding
* in order to make them "compatible" with variable-length encoding.</p>
*/
final class Varint {

private Varint() {
}

/**
* Encodes a value using the variable-length encoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
* Google Protocol Buffers</a>. It uses zig-zag encoding to efficiently
* encode signed values. If values are known to be nonnegative,
* {@link #writeUnsignedVarLong(long, java.io.DataOutputStream)} should be used.
*
* @param value value to encode
* @param out to write bytes to
* @throws java.io.IOException if {@link java.io.DataOutput} throws {@link java.io.IOException}
*/
public static void writeSignedVarLong(long value, DataOutputStream out) throws IOException {
// Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
writeUnsignedVarLong((value << 1) ^ (value >> 63), out);
}

/**
* Encodes a value using the variable-length encoding from
* <a href="http://code.google.com/apis/protocolbuffers/docs/encoding.html">
* Google Protocol Buffers</a>. Zig-zag is not used, so input must not be negative.
* If values can be negative, use {@link #writeSignedVarLong(long, java.io.DataOutputStream)}
* instead. This method treats negative input as like a large unsigned value.
*
* @param value value to encode
* @param out to write bytes to
* @throws java.io.IOException if {@link java.io.DataOutputStream} throws {@link java.io.IOException}
*/
public static void writeUnsignedVarLong(long value, DataOutputStream out) throws IOException {
while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) {
out.writeByte(((int) value & 0x7F) | 0x80);
value >>>= 7;
}
out.writeByte((int) value & 0x7F);
}

/**
* @see #writeSignedVarLong(long, java.io.DataOutputStream)
*/
public static void writeSignedVarInt(int value, DataOutputStream out) throws IOException {
// Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
writeUnsignedVarInt((value << 1) ^ (value >> 31), out);
}

/**
* @see #writeUnsignedVarLong(long, java.io.DataOutputStream)
*/
public static void writeUnsignedVarInt(int value, DataOutputStream out) throws IOException {
while ((value & 0xFFFFFF80) != 0L) {
out.writeByte((value & 0x7F) | 0x80);
value >>>= 7;
}
out.writeByte(value & 0x7F);
}

/**
* @param in to read bytes from
* @return decode value
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @throws IllegalArgumentException if variable-length value does not terminate
* after 9 bytes have been read
* @see #writeSignedVarLong(long, java.io.DataOutputStream)
*/
public static long readSignedVarLong(DataInputStream in) throws IOException {
long raw = readUnsignedVarLong(in);
// This undoes the trick in writeSignedVarLong()
long temp = (((raw << 63) >> 63) ^ raw) >> 1;
// This extra step lets us deal with the largest signed values by treating
// negative results from read unsigned methods as like unsigned values
// Must re-flip the top bit if the original read value had it set.
return temp ^ (raw & (1L << 63));
}

/**
* @param in to read bytes from
* @return decode value
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @throws IllegalArgumentException if variable-length value does not terminate
* after 9 bytes have been read
* @see #writeUnsignedVarLong(long, java.io.DataOutputStream)
*/
public static long readUnsignedVarLong(DataInputStream in) throws IOException {
long value = 0L;
int i = 0;
long b;
while (((b = in.readByte()) & 0x80L) != 0) {
value |= (b & 0x7F) << i;
i += 7;
}
return value | (b << i);
}

/**
* @throws IllegalArgumentException if variable-length value does not terminate
* after 5 bytes have been read
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @see #readSignedVarLong(java.io.DataInputStream)
*/
public static int readSignedVarInt(DataInputStream in) throws IOException {
int raw = readUnsignedVarInt(in);
// This undoes the trick in writeSignedVarInt()
int temp = (((raw << 31) >> 31) ^ raw) >> 1;
// This extra step lets us deal with the largest signed values by treating
// negative results from read unsigned methods as like unsigned values.
// Must re-flip the top bit if the original read value had it set.
return temp ^ (raw & (1 << 31));
}

/**
* @throws IllegalArgumentException if variable-length value does not terminate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a lie, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like one(this code isn't mine), add test, or kill comment?

        while (((b = in.readByte()) & 0x80) != 0 && i < 42) {
            value |= (b & 0x7F) << i;
            i += 7;
        }
        if(i == 42) { // Over read!
            throw IllegalArgumentException("Read more than 5 bytes of data, must be invalid Var int")
        }

Looks like it would fix this one

* after 5 bytes have been read
* @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException}
* @see #readUnsignedVarLong(java.io.DataInputStream)
*/
public static int readUnsignedVarInt(DataInputStream in) throws IOException {
int value = 0;
int i = 0;
int b;
while (((b = in.readByte()) & 0x80) != 0) {
value |= (b & 0x7F) << i;
i += 7;
}
return value | (b << i);
}

}
6 changes: 3 additions & 3 deletions chill-java/src/main/java/com/twitter/chill/Base64.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* <p>Example:</p>
*
* <code>String encoded = Base64.encode( myByteArray );</code>
* <br />
* <br>
* <code>byte[] myByteArray = Base64.decode( encoded );</code>
*
* <p>The <tt>options</tt> parameter, which appears in a few places, is used to pass
Expand Down Expand Up @@ -1668,7 +1668,7 @@ public InputStream( java.io.InputStream in ) {
* Valid options:<pre>
* ENCODE or DECODE: Encode or Decode as data is read.
* DO_BREAK_LINES: break lines at 76 characters
* (only meaningful when encoding)</i>
* <i>(only meaningful when encoding)</i>
* </pre>
* <p>
* Example: <code>new Base64.InputStream( in, Base64.DECODE )</code>
Expand Down Expand Up @@ -1881,7 +1881,7 @@ public OutputStream( java.io.OutputStream out ) {
* Valid options:<pre>
* ENCODE or DECODE: Encode or Decode as data is read.
* DO_BREAK_LINES: don't break lines at 76 characters
* (only meaningful when encoding)</i>
* <i>(only meaningful when encoding)</i>
* </pre>
* <p>
* Example: <code>new Base64.OutputStream( out, Base64.ENCODE )</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public ReflectingDefaultRegistrar(Class<T> cls, Class<? extends Serializer<?>> s
public Class<T> getRegisteredClass() { return klass; }
public Class<? extends Serializer<?>> getSerializerClass() { return serializerKlass; }
@Override
public void apply(Kryo k) { k.addDefaultSerializer(klass, k.newSerializer(serializerKlass, klass)); }
public void apply(Kryo k) { k.addDefaultSerializer(klass, serializerKlass); }

@Override
public int hashCode() { return klass.hashCode() ^ serializerKlass.hashCode(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.util.Util;

/** Use reflection to instantiate a serializer.
* Used when serializer classes are written to config files
Expand All @@ -34,8 +35,16 @@ public ReflectingRegistrar(Class<T> cls, Class<? extends Serializer<?>> ser) {
klass = cls;
serializerKlass = ser;
}

@Override
public void apply(Kryo k) { k.register(klass, k.newSerializer(serializerKlass, klass)); }
public void apply(Kryo k) {
try {
k.register(klass, serializerKlass.newInstance());
} catch (Exception ex) {
throw new IllegalArgumentException("Unable to create serializer \"" + serializerKlass.getName() + "\" for class: "
+ Util.className(klass), ex);
}
}
@Override
public int hashCode() { return klass.hashCode() ^ serializerKlass.hashCode(); }

Expand Down
4 changes: 2 additions & 2 deletions chill-java/src/main/java/com/twitter/chill/SerDeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public void clear() {
public void setInput(byte[] in, int offset, int count) { input.setBuffer(in, offset, count); }
public void setInput(InputStream in) { input.setInputStream(in); }

public int numOfWrittenBytes() { return output.total(); }
public int numOfReadBytes() { return input.total(); }
public long numOfWrittenBytes() { return output.total(); }
public long numOfReadBytes() { return input.total(); }

// Common operations:
public <T> T readObject(Class<T> cls) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ConfiguredInstantiator extends KryoInstantiator {
protected final KryoInstantiator delegate;

/** Key we use to configure this class.
* Format: <class of KryoInstantiator>(:<base64 serialized instantiator>)
* Format: &lt;class of KryoInstantiator&gt;(:&lt;base64 serialized instantiator&gt;)
* if there is no serialized instantiator, we use the reflected instance
* as the delegate
*/
Expand Down
12 changes: 6 additions & 6 deletions chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class KryoBase extends Kryo {
object Instantiators {
// Go through the list and use the first that works
def newOrElse(cls: Class[_],
it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator]],
elsefn: => ObjectInstantiator): ObjectInstantiator = {
it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator[AnyRef]]],
elsefn: => ObjectInstantiator[_]): ObjectInstantiator[_] = {
// Just go through and try each one,
it.map { fn =>
fn(cls) match {
Expand All @@ -117,8 +117,8 @@ object Instantiators {
}

// Use call by name:
def forClass(t: Class[_])(fn: () => Any): ObjectInstantiator =
new ObjectInstantiator {
def forClass(t: Class[_])(fn: () => Any): ObjectInstantiator[AnyRef] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be:

def forClass[T](t: Class[T])(fn: () => T): ObjectInstantiator[T] =
    new ObjectInstantiator[T] {
       override def newInstance() = {
         try { fn() }

new ObjectInstantiator[AnyRef] {
override def newInstance() = {
try { fn().asInstanceOf[AnyRef] }
catch {
Expand All @@ -130,7 +130,7 @@ object Instantiators {
}

// This one tries reflectasm, which is a fast way of constructing an object
def reflectAsm(t: Class[_]): Either[Throwable, ObjectInstantiator] = {
def reflectAsm(t: Class[_]): Either[Throwable, ObjectInstantiator[AnyRef]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also can we add the T parameter here?

try {
val access = ConstructorAccess.get(t)
// Try it once, because this isn't always successful:
Expand All @@ -154,7 +154,7 @@ object Instantiators {
}
}

def normalJava(t: Class[_]): Either[Throwable, ObjectInstantiator] = {
def normalJava(t: Class[_]): Either[Throwable, ObjectInstantiator[AnyRef]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add T

try {
val cons = getConstructor(t)
Right(forClass(t) { () => cons.newInstance() })
Expand Down
7 changes: 3 additions & 4 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import com.typesafe.sbt.SbtScalariform._
import scala.collection.JavaConverters._

object ChillBuild extends Build {
val kryoVersion = "2.21"

val kryoVersion = "3.0.3"
val bijectionVersion = "0.8.1"
val algebirdVersion = "0.11.0"

Expand All @@ -27,9 +27,8 @@ object ChillBuild extends Build {
scalacOptions ++= Seq("-unchecked", "-deprecation"),
ScalariformKeys.preferences := formattingPreferences,

// Twitter Hadoop needs this, sorry 1.7 fans
javacOptions ++= Seq("-target", "1.6", "-source", "1.6", "-Xlint:-options"),
javacOptions in doc := Seq("-source", "1.6"),
javacOptions in doc := Seq("-source", "1.7"),

resolvers ++= Seq(
Opts.resolver.sonatypeSnapshots,
Expand All @@ -38,7 +37,7 @@ object ChillBuild extends Build {
libraryDependencies ++= Seq(
"org.scalacheck" %% "scalacheck" % "1.11.5" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test",
"com.esotericsoftware.kryo" % "kryo" % kryoVersion
"com.esotericsoftware" % "kryo-shaded" % kryoVersion
),

parallelExecution in Test := true,
Expand Down