Skip to content

Commit

Permalink
Allow both spark.kryo.classesToRegister and spark.kryo.registrator at…
Browse files Browse the repository at this point in the history
… the same time
  • Loading branch information
sryza committed Oct 13, 2014
1 parent 6a15bb7 commit b824932
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class KryoSerializer(conf: SparkConf)
private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val userRegistrator = conf.getOption("spark.kryo.registrator")
private val userRegistratorName = conf.getOption("spark.kryo.registrator")
private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
.split(',')
.filter(!_.isEmpty)
Expand Down Expand Up @@ -91,20 +91,26 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())

// Allow the user to register their own classes by setting spark.kryo.registrator
try {
val reg = userRegistrator
.map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator])
.getOrElse(new DefaultKryoRegistrator(classesToRegister))
logDebug("Running Kryo registrator: " + reg.getClass.getName)
// Allow the user to register their own classes by setting spark.kryo.registrator.
val userRegistrator = userRegistratorName.map( reg => try {
Class.forName(reg, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
} catch {
case e: Exception => {
throw new SparkException(s"Failed to load user registrator " +
userRegistratorName.getOrElse(""), e)
}
})

try {
// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
reg.registerClasses(kryo)
// Register classes given through spark.kryo.classesToRegister.
classesToRegister.foreach { clazz => kryo.register(clazz) }
// Call user registrator.
userRegistrator.foreach { reg => reg.registerClasses(kryo) }
} catch {
case e: Exception =>
throw new SparkException(s"Failed to invoke registrator " +
userRegistrator.getOrElse(""), e)
throw new SparkException(s"Failed to register classes with Kryo", e)
} finally {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}
Expand All @@ -117,6 +123,20 @@ class KryoSerializer(conf: SparkConf)
kryo
}

private def registerClasses(registrator: KryoRegistrator, classLoader: ClassLoader, kryo: Kryo) {
val oldClassLoader = Thread.currentThread.getContextClassLoader
try {
// Use the default classloader when calling the user registrator.
Thread.currentThread.setContextClassLoader(classLoader)
registrator.registerClasses(kryo)
} catch {
case e: Exception =>
throw new SparkException(s"Failed to invoke registrator " + registrator.getClass.getName, e)
} finally {
Thread.currentThread.setContextClassLoader(oldClassLoader)
}
}

override def newInstance(): SerializerInstance = {
new KryoSerializerInstance(this)
}
Expand Down Expand Up @@ -248,12 +268,6 @@ private class JavaIterableWrapperSerializer
}
}

private class DefaultKryoRegistrator(classes: Seq[Class[_ <: Any]]) extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
classes.foreach { clazz => kryo.register(clazz) }
}
}

private object JavaIterableWrapperSerializer extends Logging {
// The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper).
val wrapperClass =
Expand Down
24 changes: 23 additions & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark

import org.scalatest.FunSuite
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext {
test("loading from system properties") {
Expand Down Expand Up @@ -158,6 +159,21 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
serializer.newInstance().serialize(new Class3())
}

test("register kryo classes through registerKryoClasses and custom registrator") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")

conf.registerKryoClasses(Array(classOf[Class1]))
assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)

conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)

// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
// blow up.
val serializer = new KryoSerializer(conf)
serializer.newInstance().serialize(new Class1())
serializer.newInstance().serialize(new Class2())
}

test("register kryo classes through conf") {
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
Expand All @@ -174,3 +190,9 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
class Class1 {}
class Class2 {}
class Class3 {}

class CustomRegistrator extends KryoRegistrator {
def registerClasses(kryo: Kryo) {
kryo.register(classOf[Class2])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
conf.set("spark.kryo.registrator", "this.class.does.not.exist")

val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance())
assert(thrown.getMessage.contains("Failed to invoke registrator this.class.does.not.exist"))
assert(thrown.getMessage.contains("Failed to load user registrator this.class.does.not.exist"))
}

test("default class loader can be set by a different thread") {
Expand Down
3 changes: 1 addition & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ of the most common options to set are:
<td>(none)</td>
<td>
If you use Kryo serialization, give a comma-separated list of custom class names to register
with Kryo. If a custom registrator is given through <code>spark.kryo.registrator</code> it
overrides any classes specified through this property.
with Kryo.
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
</td>
</tr>
Expand Down

0 comments on commit b824932

Please sign in to comment.