From dcef00fa184badeb6ff7fea79b0e3ea6ff916023 Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Sun, 7 Jun 2015 10:02:51 -0400 Subject: [PATCH 01/17] Bumped kryo version to 3.0.1 --- project/Build.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 1d9ae226..a3228430 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -10,7 +10,7 @@ import com.typesafe.sbt.SbtScalariform._ import scala.collection.JavaConverters._ object ChillBuild extends Build { - val kryoVersion = "2.21" + val kryoVersion = "3.0.1" def isScala210x(scalaVersion: String) = scalaVersion match { @@ -38,7 +38,7 @@ object ChillBuild extends Build { libraryDependencies ++= Seq( "org.scalacheck" %% "scalacheck" % "1.11.5" % "test", "org.scalatest" %% "scalatest" % "2.2.2" % "test", - "com.esotericsoftware.kryo" % "kryo" % kryoVersion + "com.esotericsoftware" % "kryo-shaded" % kryoVersion ), parallelExecution in Test := true, From 2a064a4e83d9b7e832ae7b09c713aa46527db85c Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Sun, 7 Jun 2015 10:03:52 -0400 Subject: [PATCH 02/17] Changed numOfWrittenBytes to long to match Kryo --- chill-java/src/main/java/com/twitter/chill/SerDeState.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chill-java/src/main/java/com/twitter/chill/SerDeState.java b/chill-java/src/main/java/com/twitter/chill/SerDeState.java index 3a023594..640ba6f0 100644 --- a/chill-java/src/main/java/com/twitter/chill/SerDeState.java +++ b/chill-java/src/main/java/com/twitter/chill/SerDeState.java @@ -50,8 +50,8 @@ public void clear() { public void setInput(byte[] in, int offset, int count) { input.setBuffer(in, offset, count); } public void setInput(InputStream in) { input.setInputStream(in); } - public int numOfWrittenBytes() { return output.total(); } - public int numOfReadBytes() { return input.total(); } + public long numOfWrittenBytes() { return output.total(); } + public long numOfReadBytes() { return input.total(); } // Common operations: public T readObject(Class cls) { From bb52eee9118acf3a9241f83c2fd69a595ef3683f Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Sun, 7 Jun 2015 10:04:31 -0400 Subject: [PATCH 03/17] Removed use of deprecated newSerializer --- .../com/twitter/chill/ReflectingDefaultRegistrar.java | 2 +- .../java/com/twitter/chill/ReflectingRegistrar.java | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/chill-java/src/main/java/com/twitter/chill/ReflectingDefaultRegistrar.java b/chill-java/src/main/java/com/twitter/chill/ReflectingDefaultRegistrar.java index 8a7db933..7ace7507 100644 --- a/chill-java/src/main/java/com/twitter/chill/ReflectingDefaultRegistrar.java +++ b/chill-java/src/main/java/com/twitter/chill/ReflectingDefaultRegistrar.java @@ -32,7 +32,7 @@ public ReflectingDefaultRegistrar(Class cls, Class> s public Class getRegisteredClass() { return klass; } public Class> getSerializerClass() { return serializerKlass; } @Override - public void apply(Kryo k) { k.addDefaultSerializer(klass, k.newSerializer(serializerKlass, klass)); } + public void apply(Kryo k) { k.addDefaultSerializer(klass, serializerKlass); } @Override public int hashCode() { return klass.hashCode() ^ serializerKlass.hashCode(); } diff --git a/chill-java/src/main/java/com/twitter/chill/ReflectingRegistrar.java b/chill-java/src/main/java/com/twitter/chill/ReflectingRegistrar.java index b144f6a7..e55fa04b 100644 --- a/chill-java/src/main/java/com/twitter/chill/ReflectingRegistrar.java +++ b/chill-java/src/main/java/com/twitter/chill/ReflectingRegistrar.java @@ -18,6 +18,7 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.util.Util; /** Use reflection to instantiate a serializer. * Used when serializer classes are written to config files @@ -34,8 +35,16 @@ public ReflectingRegistrar(Class cls, Class> ser) { klass = cls; serializerKlass = ser; } + @Override - public void apply(Kryo k) { k.register(klass, k.newSerializer(serializerKlass, klass)); } + public void apply(Kryo k) { + try { + k.register(klass, serializerKlass.newInstance()); + } catch (Exception ex) { + throw new IllegalArgumentException("Unable to create serializer \"" + serializerKlass.getName() + "\" for class: " + + Util.className(klass), ex); + } + } @Override public int hashCode() { return klass.hashCode() ^ serializerKlass.hashCode(); } From 7596f6c7e29163665f12472aa972efd0baad6fa2 Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Sun, 7 Jun 2015 10:06:21 -0400 Subject: [PATCH 04/17] Add AnyRef type to generic ObjectInstantiator --- .../src/main/scala/com/twitter/chill/KryoBase.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala index 50f297c8..0a18f196 100644 --- a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala +++ b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala @@ -102,8 +102,8 @@ class KryoBase extends Kryo { object Instantiators { // Go through the list and use the first that works def newOrElse(cls: Class[_], - it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator]], - elsefn: => ObjectInstantiator): ObjectInstantiator = { + it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator[AnyRef]]], + elsefn: => ObjectInstantiator[_]): ObjectInstantiator[_] = { // Just go through and try each one, it.map { fn => fn(cls) match { @@ -117,8 +117,8 @@ object Instantiators { } // Use call by name: - def forClass(t: Class[_])(fn: () => Any): ObjectInstantiator = - new ObjectInstantiator { + def forClass(t: Class[_])(fn: () => Any): ObjectInstantiator[AnyRef] = + new ObjectInstantiator[AnyRef] { override def newInstance() = { try { fn().asInstanceOf[AnyRef] } catch { @@ -130,7 +130,7 @@ object Instantiators { } // This one tries reflectasm, which is a fast way of constructing an object - def reflectAsm(t: Class[_]): Either[Throwable, ObjectInstantiator] = { + def reflectAsm(t: Class[_]): Either[Throwable, ObjectInstantiator[AnyRef]] = { try { val access = ConstructorAccess.get(t) // Try it once, because this isn't always successful: @@ -154,7 +154,7 @@ object Instantiators { } } - def normalJava(t: Class[_]): Either[Throwable, ObjectInstantiator] = { + def normalJava(t: Class[_]): Either[Throwable, ObjectInstantiator[AnyRef]] = { try { val cons = getConstructor(t) Right(forClass(t) { () => cons.newInstance() }) From a0909b7ff6d14779a7fd91e96d510d0eb6f99a4d Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Sun, 7 Jun 2015 10:08:17 -0400 Subject: [PATCH 05/17] Write long size instead of int, and convert when allocating byte array --- .../main/java/com/twitter/chill/hadoop/KryoDeserializer.java | 2 +- .../src/main/java/com/twitter/chill/hadoop/KryoSerializer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java index 137f050a..517f300e 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java @@ -46,7 +46,7 @@ public void open(InputStream in) throws IOException { public Object deserialize(Object o) throws IOException { // TODO, we could share these buffers if we see that alloc is bottlenecking - byte[] bytes = new byte[inputStream.readInt()]; + byte[] bytes = new byte[(int) inputStream.readLong()]; inputStream.readFully( bytes ); return kryoPool.fromBytes(bytes, klass); } diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java index 81a23350..57a4e969 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java @@ -47,7 +47,7 @@ public void serialize(Object o) throws IOException { try { st.writeObject(o); // Copy from buffer to output stream. - outputStream.writeInt(st.numOfWrittenBytes()); + outputStream.writeLong(st.numOfWrittenBytes()); st.writeOutputTo(outputStream); } finally { From f6d462e040a4d86f8b19a6fe7d28e3b3f21e0e3b Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Mon, 8 Jun 2015 15:19:40 -0400 Subject: [PATCH 06/17] Add Java 8 compilation compatibility --- chill-java/src/main/java/com/twitter/chill/Base64.java | 6 +++--- .../com/twitter/chill/config/ConfiguredInstantiator.java | 2 +- project/Build.scala | 9 ++++++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/chill-java/src/main/java/com/twitter/chill/Base64.java b/chill-java/src/main/java/com/twitter/chill/Base64.java index d1481635..38e4793e 100644 --- a/chill-java/src/main/java/com/twitter/chill/Base64.java +++ b/chill-java/src/main/java/com/twitter/chill/Base64.java @@ -6,7 +6,7 @@ *

Example:

* * String encoded = Base64.encode( myByteArray ); - *
+ *
* byte[] myByteArray = Base64.decode( encoded ); * *

The options parameter, which appears in a few places, is used to pass @@ -1668,7 +1668,7 @@ public InputStream( java.io.InputStream in ) { * Valid options:

          *   ENCODE or DECODE: Encode or Decode as data is read.
          *   DO_BREAK_LINES: break lines at 76 characters
-         *     (only meaningful when encoding)
+         *     (only meaningful when encoding)
          * 
*

* Example: new Base64.InputStream( in, Base64.DECODE ) @@ -1881,7 +1881,7 @@ public OutputStream( java.io.OutputStream out ) { * Valid options:

          *   ENCODE or DECODE: Encode or Decode as data is read.
          *   DO_BREAK_LINES: don't break lines at 76 characters
-         *     (only meaningful when encoding)
+         *     (only meaningful when encoding)
          * 
*

* Example: new Base64.OutputStream( out, Base64.ENCODE ) diff --git a/chill-java/src/main/java/com/twitter/chill/config/ConfiguredInstantiator.java b/chill-java/src/main/java/com/twitter/chill/config/ConfiguredInstantiator.java index 4c6c61ee..91bec774 100644 --- a/chill-java/src/main/java/com/twitter/chill/config/ConfiguredInstantiator.java +++ b/chill-java/src/main/java/com/twitter/chill/config/ConfiguredInstantiator.java @@ -40,7 +40,7 @@ public class ConfiguredInstantiator extends KryoInstantiator { protected final KryoInstantiator delegate; /** Key we use to configure this class. - * Format: (:) + * Format: <class of KryoInstantiator>(:<base64 serialized instantiator>) * if there is no serialized instantiator, we use the reflected instance * as the delegate */ diff --git a/project/Build.scala b/project/Build.scala index a3228430..9658be00 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -27,9 +27,12 @@ object ChillBuild extends Build { scalacOptions ++= Seq("-unchecked", "-deprecation"), ScalariformKeys.preferences := formattingPreferences, - // Twitter Hadoop needs this, sorry 1.7 fans - javacOptions ++= Seq("-target", "1.6", "-source", "1.6", "-Xlint:-options"), - javacOptions in doc := Seq("-source", "1.6"), + javacOptions ++= ( + if (scalaVersion.value.startsWith("2.11")) Seq("-target", "1.8", "-source", "1.8", "-Xlint:-options") + else Seq("-target", "1.7", "-source", "1.7", "-Xlint:-options")), + javacOptions in doc := ( + if (scalaVersion.value.startsWith("2.11")) Seq("-source", "1.8") + else Seq("-source", "1.7")), resolvers ++= Seq( Opts.resolver.sonatypeSnapshots, From defab77f559a5a6cdfe3204c718f3f8f9e98e10c Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Mon, 8 Jun 2015 15:21:06 -0400 Subject: [PATCH 07/17] Added TravisCI Java 8 build --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 30adc747..bfd9bf67 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,3 +3,5 @@ sudo: false scala: - 2.10.4 - 2.11.2 +jdk: + - oraclejdk8 From 3ef80a64cfcba995da37c39130cb1894062b7002 Mon Sep 17 00:00:00 2001 From: Andrew FigPope Date: Tue, 9 Jun 2015 04:39:19 -0400 Subject: [PATCH 08/17] Added Varint / Varlong support --- .../chill/hadoop/KryoDeserializer.java | 2 +- .../twitter/chill/hadoop/KryoSerializer.java | 2 +- .../java/com/twitter/chill/hadoop/Varint.java | 168 ++++++++++++++++++ 3 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java index 517f300e..b469f5f4 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java @@ -46,7 +46,7 @@ public void open(InputStream in) throws IOException { public Object deserialize(Object o) throws IOException { // TODO, we could share these buffers if we see that alloc is bottlenecking - byte[] bytes = new byte[(int) inputStream.readLong()]; + byte[] bytes = new byte[(int) Varint.readUnsignedVarLong(inputStream)]; inputStream.readFully( bytes ); return kryoPool.fromBytes(bytes, klass); } diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java index 57a4e969..4f8e656d 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java @@ -47,7 +47,7 @@ public void serialize(Object o) throws IOException { try { st.writeObject(o); // Copy from buffer to output stream. - outputStream.writeLong(st.numOfWrittenBytes()); + Varint.writeUnsignedVarLong(st.numOfWrittenBytes(), outputStream); st.writeOutputTo(outputStream); } finally { diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java new file mode 100644 index 00000000..2a856e9f --- /dev/null +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Taken from org.apache.mahout.math + * https://github.com/apache/mahout + */ + +package com.twitter.chill.hadoop; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + *

Encodes signed and unsigned values using a common variable-length + * scheme, found for example in + * + * Google's Protocol Buffers. It uses fewer bytes to encode smaller values, + * but will use slightly more bytes to encode large values.

+ * + *

Signed values are further encoded using so-called zig-zag encoding + * in order to make them "compatible" with variable-length encoding.

+ */ +final class Varint { + + private Varint() { + } + + /** + * Encodes a value using the variable-length encoding from + * + * Google Protocol Buffers. It uses zig-zag encoding to efficiently + * encode signed values. If values are known to be nonnegative, + * {@link #writeUnsignedVarLong(long, java.io.DataOutputStream)} should be used. + * + * @param value value to encode + * @param out to write bytes to + * @throws java.io.IOException if {@link java.io.DataOutput} throws {@link java.io.IOException} + */ + public static void writeSignedVarLong(long value, DataOutputStream out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarLong((value << 1) ^ (value >> 63), out); + } + + /** + * Encodes a value using the variable-length encoding from + * + * Google Protocol Buffers. Zig-zag is not used, so input must not be negative. + * If values can be negative, use {@link #writeSignedVarLong(long, java.io.DataOutputStream)} + * instead. This method treats negative input as like a large unsigned value. + * + * @param value value to encode + * @param out to write bytes to + * @throws java.io.IOException if {@link java.io.DataOutputStream} throws {@link java.io.IOException} + */ + public static void writeUnsignedVarLong(long value, DataOutputStream out) throws IOException { + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.writeByte(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte((int) value & 0x7F); + } + + /** + * @see #writeSignedVarLong(long, java.io.DataOutputStream) + */ + public static void writeSignedVarInt(int value, DataOutputStream out) throws IOException { + // Great trick from http://code.google.com/apis/protocolbuffers/docs/encoding.html#types + writeUnsignedVarInt((value << 1) ^ (value >> 31), out); + } + + /** + * @see #writeUnsignedVarLong(long, java.io.DataOutputStream) + */ + public static void writeUnsignedVarInt(int value, DataOutputStream out) throws IOException { + while ((value & 0xFFFFFF80) != 0L) { + out.writeByte((value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte(value & 0x7F); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException} + * @throws IllegalArgumentException if variable-length value does not terminate + * after 9 bytes have been read + * @see #writeSignedVarLong(long, java.io.DataOutputStream) + */ + public static long readSignedVarLong(DataInputStream in) throws IOException { + long raw = readUnsignedVarLong(in); + // This undoes the trick in writeSignedVarLong() + long temp = (((raw << 63) >> 63) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1L << 63)); + } + + /** + * @param in to read bytes from + * @return decode value + * @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException} + * @throws IllegalArgumentException if variable-length value does not terminate + * after 9 bytes have been read + * @see #writeUnsignedVarLong(long, java.io.DataOutputStream) + */ + public static long readUnsignedVarLong(DataInputStream in) throws IOException { + long value = 0L; + int i = 0; + long b; + while (((b = in.readByte()) & 0x80L) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate + * after 5 bytes have been read + * @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException} + * @see #readSignedVarLong(java.io.DataInputStream) + */ + public static int readSignedVarInt(DataInputStream in) throws IOException { + int raw = readUnsignedVarInt(in); + // This undoes the trick in writeSignedVarInt() + int temp = (((raw << 31) >> 31) ^ raw) >> 1; + // This extra step lets us deal with the largest signed values by treating + // negative results from read unsigned methods as like unsigned values. + // Must re-flip the top bit if the original read value had it set. + return temp ^ (raw & (1 << 31)); + } + + /** + * @throws IllegalArgumentException if variable-length value does not terminate + * after 5 bytes have been read + * @throws java.io.IOException if {@link java.io.DataInput} throws {@link java.io.IOException} + * @see #readUnsignedVarLong(java.io.DataInputStream) + */ + public static int readUnsignedVarInt(DataInputStream in) throws IOException { + int value = 0; + int i = 0; + int b; + while (((b = in.readByte()) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + +} \ No newline at end of file From cd993e34fc2b1e864b5f19301e16616eefa62b89 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 16 Nov 2015 11:34:44 -0800 Subject: [PATCH 09/17] Add versions --- .travis.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index bfd9bf67..a942a1b9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,8 @@ language: scala sudo: false scala: - - 2.10.4 - - 2.11.2 + - 2.10.5 + - 2.11.7 jdk: + - oraclejdk7 - oraclejdk8 From 8e8ab865e1bfc823402ce2b2f07f98c632656e07 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 16 Nov 2015 11:38:30 -0800 Subject: [PATCH 10/17] Bump to latest kryo --- project/Build.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Build.scala b/project/Build.scala index c4a272d4..28478f0b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -10,8 +10,8 @@ import com.typesafe.sbt.SbtScalariform._ import scala.collection.JavaConverters._ object ChillBuild extends Build { - val kryoVersion = "3.0.1" + val kryoVersion = "3.0.3" val bijectionVersion = "0.8.1" val algebirdVersion = "0.11.0" From b51eee4c7b5ac139809a808de3242f211c6eaf75 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Mon, 16 Nov 2015 11:46:36 -0800 Subject: [PATCH 11/17] remove java version settings --- project/Build.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 28478f0b..e7587574 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -27,12 +27,8 @@ object ChillBuild extends Build { scalacOptions ++= Seq("-unchecked", "-deprecation"), ScalariformKeys.preferences := formattingPreferences, - javacOptions ++= ( - if (scalaVersion.value.startsWith("2.11")) Seq("-target", "1.8", "-source", "1.8", "-Xlint:-options") - else Seq("-target", "1.7", "-source", "1.7", "-Xlint:-options")), - javacOptions in doc := ( - if (scalaVersion.value.startsWith("2.11")) Seq("-source", "1.8") - else Seq("-source", "1.7")), + javacOptions ++= Seq("-target", "1.6", "-source", "1.6", "-Xlint:-options"), + javacOptions in doc := Seq("-source", "1.7"), resolvers ++= Seq( Opts.resolver.sonatypeSnapshots, From 5bf625ce94e467c7b8309486ecc8efcaa44ccee8 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 9 Feb 2016 14:02:24 -0800 Subject: [PATCH 12/17] Change version to be a snapshot one. Review comments --- .../src/main/scala/com/twitter/chill/KryoBase.scala | 12 ++++++------ version.sbt | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala index 19072dac..b3ab88c7 100644 --- a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala +++ b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala @@ -93,7 +93,7 @@ class KryoBase extends Kryo { override def newInstantiator(cls: Class[_]) = { import Instantiators._ newOrElse(cls, - List(reflectAsm _, normalJava _), + List(reflectAsm(_), normalJava(_)), // Or fall back on the strategy: tryStrategy(cls).newInstantiatorOf(cls)) } @@ -101,7 +101,7 @@ class KryoBase extends Kryo { object Instantiators { // Go through the list and use the first that works - def newOrElse(cls: Class[_], + def newOrElse[T](cls: Class[T], it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator[AnyRef]]], elsefn: => ObjectInstantiator[_]): ObjectInstantiator[_] = { // Just go through and try each one, @@ -117,7 +117,7 @@ object Instantiators { } // Use call by name: - def forClass(t: Class[_])(fn: () => Any): ObjectInstantiator[AnyRef] = + def forClass[T](t: Class[T])(fn: () => T): ObjectInstantiator[AnyRef] = new ObjectInstantiator[AnyRef] { override def newInstance() = { try { fn().asInstanceOf[AnyRef] } @@ -130,7 +130,7 @@ object Instantiators { } // This one tries reflectasm, which is a fast way of constructing an object - def reflectAsm(t: Class[_]): Either[Throwable, ObjectInstantiator[AnyRef]] = { + def reflectAsm[T](t: Class[T]): Either[Throwable, ObjectInstantiator[AnyRef]] = { try { val access = ConstructorAccess.get(t) // Try it once, because this isn't always successful: @@ -142,7 +142,7 @@ object Instantiators { } } - def getConstructor(c: Class[_]): Constructor[_] = { + def getConstructor[T](c: Class[T]): Constructor[T] = { try { c.getConstructor() } catch { @@ -154,7 +154,7 @@ object Instantiators { } } - def normalJava(t: Class[_]): Either[Throwable, ObjectInstantiator[AnyRef]] = { + def normalJava[T](t: Class[T]): Either[Throwable, ObjectInstantiator[AnyRef]] = { try { val cons = getConstructor(t) Right(forClass(t) { () => cons.newInstance() }) diff --git a/version.sbt b/version.sbt index 21ea1e3b..48bb0f19 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.7.3" \ No newline at end of file +version in ThisBuild := "0.8.0-SNAPSHOT" \ No newline at end of file From 29ad0db32d223cc3ad0fb094f67d7d40918ac303 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Tue, 9 Feb 2016 14:08:51 -0800 Subject: [PATCH 13/17] Get types on other methods --- .../scala/com/twitter/chill/KryoBase.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala index b3ab88c7..9b85c609 100644 --- a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala +++ b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala @@ -90,10 +90,12 @@ class KryoBase extends Kryo { /* Fixes the case where Kryo's reflectasm doesn't work, even though it claims to * TODO this should be fixed in Kryo. When it is, remove this */ - override def newInstantiator(cls: Class[_]) = { + override def newInstantiator(cls: Class[_]) = newTypedInstantiator[AnyRef](cls.asInstanceOf[Class[AnyRef]]) + + private[this] def newTypedInstantiator[T](cls: Class[T]) = { import Instantiators._ newOrElse(cls, - List(reflectAsm(_), normalJava(_)), + List(reflectAsm[T](_), normalJava[T](_)), // Or fall back on the strategy: tryStrategy(cls).newInstantiatorOf(cls)) } @@ -102,8 +104,8 @@ class KryoBase extends Kryo { object Instantiators { // Go through the list and use the first that works def newOrElse[T](cls: Class[T], - it: TraversableOnce[Class[_] => Either[Throwable, ObjectInstantiator[AnyRef]]], - elsefn: => ObjectInstantiator[_]): ObjectInstantiator[_] = { + it: TraversableOnce[Class[T] => Either[Throwable, ObjectInstantiator[T]]], + elsefn: => ObjectInstantiator[T]): ObjectInstantiator[T] = { // Just go through and try each one, it.map { fn => fn(cls) match { @@ -117,10 +119,10 @@ object Instantiators { } // Use call by name: - def forClass[T](t: Class[T])(fn: () => T): ObjectInstantiator[AnyRef] = - new ObjectInstantiator[AnyRef] { + def forClass[T](t: Class[T])(fn: () => T): ObjectInstantiator[T] = + new ObjectInstantiator[T] { override def newInstance() = { - try { fn().asInstanceOf[AnyRef] } + try { fn() } catch { case x: Exception => { throw new KryoException("Error constructing instance of class: " + t.getName, x) @@ -130,7 +132,7 @@ object Instantiators { } // This one tries reflectasm, which is a fast way of constructing an object - def reflectAsm[T](t: Class[T]): Either[Throwable, ObjectInstantiator[AnyRef]] = { + def reflectAsm[T](t: Class[T]): Either[Throwable, ObjectInstantiator[T]] = { try { val access = ConstructorAccess.get(t) // Try it once, because this isn't always successful: @@ -154,7 +156,7 @@ object Instantiators { } } - def normalJava[T](t: Class[T]): Either[Throwable, ObjectInstantiator[AnyRef]] = { + def normalJava[T](t: Class[T]): Either[Throwable, ObjectInstantiator[T]] = { try { val cons = getConstructor(t) Right(forClass(t) { () => cons.newInstance() }) From d0dd574c42b410d218211a6a6ee7687e84a721d3 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Wed, 10 Feb 2016 08:03:40 -0800 Subject: [PATCH 14/17] Use Try rather than Either --- .../scala/com/twitter/chill/KryoBase.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala index 9b85c609..7a4d9a5a 100644 --- a/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala +++ b/chill-scala/src/main/scala/com/twitter/chill/KryoBase.scala @@ -25,6 +25,8 @@ import org.objenesis.strategy.InstantiatorStrategy import _root_.java.lang.reflect.{ Constructor, Modifier } +import scala.util.{ Try, Success, Failure } + /* * This is the base class of Kryo we use to fix specific scala * related issues discovered (ideally, this should be fixed in Kryo) @@ -104,17 +106,15 @@ class KryoBase extends Kryo { object Instantiators { // Go through the list and use the first that works def newOrElse[T](cls: Class[T], - it: TraversableOnce[Class[T] => Either[Throwable, ObjectInstantiator[T]]], + it: TraversableOnce[Class[T] => Try[ObjectInstantiator[T]]], elsefn: => ObjectInstantiator[T]): ObjectInstantiator[T] = { // Just go through and try each one, - it.map { fn => - fn(cls) match { - case Left(x) => None // ignore the exception - case Right(obji) => Some(obji) + + it + .flatMap { fn => + fn(cls).toOption } - } - .find { _.isDefined } // Find the first Some(x), returns Some(Some(x)) - .flatMap { x => x } // flatten + .find (_ => true) // first element in traversable once (no headOption defined.) .getOrElse(elsefn) } @@ -132,15 +132,15 @@ object Instantiators { } // This one tries reflectasm, which is a fast way of constructing an object - def reflectAsm[T](t: Class[T]): Either[Throwable, ObjectInstantiator[T]] = { + def reflectAsm[T](t: Class[T]): Try[ObjectInstantiator[T]] = { try { val access = ConstructorAccess.get(t) // Try it once, because this isn't always successful: access.newInstance // Okay, looks good: - Right(forClass(t) { () => access.newInstance() }) + Success(forClass(t) { () => access.newInstance() }) } catch { - case x: Throwable => Left(x) + case x: Throwable => Failure(x) } } @@ -156,12 +156,12 @@ object Instantiators { } } - def normalJava[T](t: Class[T]): Either[Throwable, ObjectInstantiator[T]] = { + def normalJava[T](t: Class[T]): Try[ObjectInstantiator[T]] = { try { val cons = getConstructor(t) - Right(forClass(t) { () => cons.newInstance() }) + Success(forClass(t) { () => cons.newInstance() }) } catch { - case x: Throwable => Left(x) + case x: Throwable => Failure(x) } } } From e0b809e1f937d88bf53c84e96f24189b182c4eff Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Wed, 10 Feb 2016 08:04:00 -0800 Subject: [PATCH 15/17] Number of bytes read or written should be an int, so return that --- chill-java/src/main/java/com/twitter/chill/SerDeState.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chill-java/src/main/java/com/twitter/chill/SerDeState.java b/chill-java/src/main/java/com/twitter/chill/SerDeState.java index 640ba6f0..b35d1898 100644 --- a/chill-java/src/main/java/com/twitter/chill/SerDeState.java +++ b/chill-java/src/main/java/com/twitter/chill/SerDeState.java @@ -50,8 +50,8 @@ public void clear() { public void setInput(byte[] in, int offset, int count) { input.setBuffer(in, offset, count); } public void setInput(InputStream in) { input.setInputStream(in); } - public long numOfWrittenBytes() { return output.total(); } - public long numOfReadBytes() { return input.total(); } + public int numOfWrittenBytes() { return (int)output.total(); } + public int numOfReadBytes() { return (int)input.total(); } // Common operations: public T readObject(Class cls) { From a885f3dfbb60edbc7e4cf6842b55c10fef956595 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Wed, 10 Feb 2016 08:04:17 -0800 Subject: [PATCH 16/17] Validate that we don't over-read for a var int --- .../src/main/java/com/twitter/chill/hadoop/Varint.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java index 2a856e9f..08ca65bb 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/Varint.java @@ -158,10 +158,13 @@ public static int readUnsignedVarInt(DataInputStream in) throws IOException { int value = 0; int i = 0; int b; - while (((b = in.readByte()) & 0x80) != 0) { + while (((b = in.readByte()) & 0x80) != 0 && i < 42) { value |= (b & 0x7F) << i; i += 7; } + if(i == 42) { // Over read! + throw new IllegalArgumentException("Read more than 5 bytes of data, must be invalid Var int"); + } return value | (b << i); } From b234e46ba2c1eaf9d5a6c730dc35a23f1e918d25 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Wed, 10 Feb 2016 08:04:35 -0800 Subject: [PATCH 17/17] Read/write var ints instead of longs for the object lengths --- .../main/java/com/twitter/chill/hadoop/KryoDeserializer.java | 2 +- .../src/main/java/com/twitter/chill/hadoop/KryoSerializer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java index b469f5f4..4012bfb9 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoDeserializer.java @@ -46,7 +46,7 @@ public void open(InputStream in) throws IOException { public Object deserialize(Object o) throws IOException { // TODO, we could share these buffers if we see that alloc is bottlenecking - byte[] bytes = new byte[(int) Varint.readUnsignedVarLong(inputStream)]; + byte[] bytes = new byte[Varint.readUnsignedVarInt(inputStream)]; inputStream.readFully( bytes ); return kryoPool.fromBytes(bytes, klass); } diff --git a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java index 4f8e656d..e16517a3 100644 --- a/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java +++ b/chill-hadoop/src/main/java/com/twitter/chill/hadoop/KryoSerializer.java @@ -47,7 +47,7 @@ public void serialize(Object o) throws IOException { try { st.writeObject(o); // Copy from buffer to output stream. - Varint.writeUnsignedVarLong(st.numOfWrittenBytes(), outputStream); + Varint.writeUnsignedVarInt(st.numOfWrittenBytes(), outputStream); st.writeOutputTo(outputStream); } finally {