Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12197] [SparkCore] Kryo & Avro - Support Schema Repo #13761

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.serializer.avro.{GenericAvroSerializer, EmptySchemaRepo, SchemaRepo}
import org.apache.spark.storage._
import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
import org.apache.spark.util.collection.CompactBuffer
Expand Down Expand Up @@ -109,8 +110,9 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
val schemaRepo = SchemaRepo(conf).getOrElse(EmptySchemaRepo)
kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas, schemaRepo))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas, schemaRepo))

try {
// scalastyle:off classforname
Expand Down Expand Up @@ -185,8 +187,8 @@ class KryoSerializer(conf: SparkConf)

private[spark]
class KryoSerializationStream(
serInstance: KryoSerializerInstance,
outStream: OutputStream) extends SerializationStream {
serInstance: KryoSerializerInstance,
outStream: OutputStream) extends SerializationStream {

private[this] var output: KryoOutput = new KryoOutput(outStream)
private[this] var kryo: Kryo = serInstance.borrowKryo()
Expand Down Expand Up @@ -218,8 +220,8 @@ class KryoSerializationStream(

private[spark]
class KryoDeserializationStream(
serInstance: KryoSerializerInstance,
inStream: InputStream) extends DeserializationStream {
serInstance: KryoSerializerInstance,
inStream: InputStream) extends DeserializationStream {

private[this] var input: KryoInput = new KryoInput(inStream)
private[this] var kryo: Kryo = serInstance.borrowKryo()
Expand Down Expand Up @@ -465,7 +467,7 @@ private class JavaIterableWrapperSerializer
}

override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]])
: java.lang.Iterable[_] = {
: java.lang.Iterable[_] = {
kryo.readClassAndObject(in) match {
case scalaIterable: Iterable[_] => scalaIterable.asJava
case javaIterable: java.lang.Iterable[_] => javaIterable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@
* limitations under the License.
*/

package org.apache.spark.serializer
package org.apache.spark.serializer.avro

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

import scala.collection.mutable

import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.apache.avro.{Schema, SchemaNormalization}
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.commons.io.IOUtils

import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.util.Utils

import scala.collection.mutable

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
Expand All @@ -44,7 +44,8 @@ import org.apache.spark.util.Utils
* string representation of the Avro schema, used to decrease the amount of data
* that needs to be serialized.
*/
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String],
schemaRepo: SchemaRepo = EmptySchemaRepo)
extends KSerializer[GenericRecord] {

/** Used to reduce the amount of effort to compress the schema */
Expand Down Expand Up @@ -106,6 +107,40 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
val encoder = EncoderFactory.get.binaryEncoder(output, null)
val schema = datum.getSchema

serializeSchema(datum, schema, output)

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}

/**
* Deserializes generic records into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
*/
def deserializeDatum(input: KryoInput): GenericRecord = {
val schema: Schema = deserializeSchema(input)

val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
}


/**
* Serialize schema
* Step 1: Calculate the schema's finger print using Avro's SchemaNormilization mechanism.
* Step 2: Use fingerprint to look for the schema in the pre-registered schemas, if found serialize the fingerprint, else step 3
* Step 3: Use SchemaRepo to find the schemaId of record, if found serialize the schemaId as fingerprint, else step 4
* Step 4: Serialize the entire schema with indicator of this behavior.
* @param datum - datum to extract id from
* @param schema - schema to serialize
* @param output - kryo output
*/
private def serializeSchema[R <: GenericRecord](datum: R, schema: Schema, output: KryoOutput) = {
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
SchemaNormalization.parsingFingerprint64(schema)
})
Expand All @@ -114,50 +149,61 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
output.writeBoolean(true)
output.writeLong(fingerprint)
case None =>
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.length)
output.writeBytes(compressedSchema)
schemaRepo.extractSchemaId(datum) match {
case Some(schemaId) if schemaRepo.contains(schemaId) =>
output.writeBoolean(true)
output.writeLong(schemaId)
case _ =>
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.length)
output.writeBytes(compressedSchema)
}
}

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}


/**
* Deserializes generic records into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
* Deserialize schema
* If the indicator boolean of finger using is false:
* 1: Deserialize the schema itself from bytes.
* If the indicator boolean of fingerprint using is true:
* Step 1: Search for the schema in the explicitly registered schemas using the fingerprint. If schema was not found, move to step 2.
* Step 2: Search in the schema repository using the fingerprint. At that point if the schema was not found - throw exception.
* @param input KryoInput
* @return the deserialized schema
*/
def deserializeDatum(input: KryoInput): GenericRecord = {
private def deserializeSchema(input: KryoInput): Schema = {
val schema = {
if (input.readBoolean()) {
val fingerprint = input.readLong()
schemaCache.getOrElseUpdate(fingerprint, {
schemas.get(fingerprint) match {
case Some(s) => new Schema.Parser().parse(s)
case Some(s) => new Parser().parse(s)
case None =>
throw new SparkException(
"Error reading attempting to read avro data -- encountered an unknown " +
s"fingerprint: $fingerprint, not sure what schema to use. This could happen " +
"if you registered additional schemas after starting your spark context.")
schemaRepo.getSchema(fingerprint) match {
case Some(res_schema) => res_schema
case None =>
throw new SparkException(
s"""Error reading attempting to read avro data --
|encountered an unknown fingerprint: $fingerprint, not sure what schema to use.
|This could happen if you registered additional schemas after starting your
|spark context.""".stripMargin)
}

}
})
} else {
val length = input.readInt()
decompress(ByteBuffer.wrap(input.readBytes(length)))
}
}
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
schema
}

override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
serializeDatum(datum, output)

override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
deserializeDatum(input)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.apache.spark.serializer.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

/**
* Created by rotems on 12/6/15.
*/
/**
* A schema repository for avro records.
* This repo assumes that it is possible to extract the schemaId of a record from the record itself.
* @param config sparkConf for configuration purposes
*/
abstract class SchemaRepo(config: SparkConf) {

/**
* Receive from repo an avro schema as string by its ID
* @param schemaId - the schemaId
* @return schema if found, none otherwise
*/
def getRawSchema(schemaId : Long) : Option[String]

/**
* Extract schemaId from record.
* @param record current avro record
* @return schemaId if managed to extract, none otherwise
*/
def extractSchemaId(record: GenericRecord) : Option[Long]

/**
* Checks whether the schema repository contains the following schemaId
* @param schemaId - the schemaId
* @return true if found in repo, false otherwise.
*/
def contains(schemaId: Long) : Boolean

/**
* Get schema from repo using schemaId as Schema type
* @param schemaId - the schemaId
* @return schema if found, none otherwise
*/
def getSchema(schemaId : Long) : Option[Schema] = {
getRawSchema(schemaId) match {
case Some(s) => Some(new Schema.Parser().parse(s))
case None => None

}
}

}

object SchemaRepo extends Logging {
val SCHEMA_REPO_KEY = "spark.kryo.avro.schema.repo"

/**
* Create a schemaRepo using SparkConf
* @param conf - spark conf used to configure the repo.
* @return the initiated SchemaRepo or None if anything goes wrong
*/
def apply(conf: SparkConf) : Option[SchemaRepo]= {
try {
conf.getOption(SCHEMA_REPO_KEY) match {
case Some(clazz) => Some(Class.forName(clazz).getConstructor(classOf[SparkConf])
.newInstance(conf).asInstanceOf[SchemaRepo])
case None => None
}
} catch {
case t: Throwable =>
log.error(s"Failed to build schema repo. ", t)
None
}
}
}

/**
* A dummy empty schema repository.
*/
object EmptySchemaRepo extends SchemaRepo(null) {

override def getRawSchema(schemaId: Long): Option[String] = None

override def extractSchemaId(record: GenericRecord): Option[Long] = None

override def contains(schemaId: Long): Boolean = false
}
Loading