Skip to content

Commit

Permalink
Merge pull request #1878 from TouK/avro-fix-reverted
Browse files Browse the repository at this point in the history
Problem with generic/specific record serialization to Array[byte] - a…
  • Loading branch information
arkadius authored Jul 5, 2021
2 parents fcd138e + 9948c80 commit 2c82230
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,38 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter
import io.circe.Json
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer
import org.apache.avro.Schema
import org.apache.avro.Schema.Type
import org.apache.avro.io.DecoderFactory
import org.apache.avro.generic.GenericData
import org.apache.avro.io.{DatumReader, DecoderFactory}
import org.apache.avro.util.Utf8
import org.apache.avro.Schema
import org.apache.kafka.common.errors.SerializationException
import pl.touk.nussknacker.engine.avro.AvroUtils
import pl.touk.nussknacker.engine.avro.schema.{DatumReaderWriterMixin, DefaultAvroSchemaEvolution}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.AbstractConfluentKafkaAvroSerializer

import scala.reflect.ClassTag

/**
* @param schemaRegistryClient schema registry client
*/
private[confluent] class ConfluentAvroMessageReader(schemaRegistryClient: SchemaRegistryClient)
extends AbstractConfluentKafkaAvroSerializer(new DefaultAvroSchemaEvolution) with DatumReaderWriterMixin {
extends AbstractKafkaAvroSerializer {

schemaRegistry = schemaRegistryClient

private val decoderFactory = DecoderFactory.get

def readJson[T: ClassTag](jsonObj: Json, schemaId: Int, schema: Schema): Array[Byte] = {
def readJson(jsonObj: Json, schema: Schema, subject: String): Array[Byte] = {
try {
val avroObj = jsonToAvro[T](jsonObj, schema)
writeData(avroObj, schema, schemaId)
val avroObj = jsonToAvro(jsonObj, schema)
serializeImpl(subject, avroObj, new AvroSchema(schema))
} catch {
case ex: Exception =>
throw new SerializationException("Error reading from input", ex)
}
}

private def jsonToAvro[T: ClassTag](jsonObj: Json, schema: Schema): AnyRef = {
private def jsonToAvro(jsonObj: Json, schema: Schema): AnyRef = {
val jsonString = jsonObj.noSpaces
try {
val reader = createDatumReader(schema, schema, useSchemaReflection = false, useSpecificAvroReader = AvroUtils.isSpecificRecord[T])
val reader: DatumReader[AnyRef] = GenericData.get().createDatumReader(schema).asInstanceOf[DatumReader[AnyRef]]
val obj = reader.read(null, decoderFactory.jsonDecoder(schema, jsonString))
if (schema.getType == Type.STRING)
obj.asInstanceOf[Utf8].toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC
case None => null
}
} else {
val keySchemaId = record.keySchemaId.getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id"))
val keySchema = getSchemaById(keySchemaId)
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchemaId, keySchema)).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
val keySchema = record.keySchemaId.map(id => getSchemaById(id)).getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id"))
keyOpt.map(keyJson => messageReader.readJson(keyJson, keySchema, ConfluentUtils.keySubject(topic))).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
}
val valueSchema = getSchemaById(record.valueSchemaId)
val valueBytes = messageReader.readJson[V](value, record.valueSchemaId, valueSchema)
val valueBytes = messageReader.readJson(value, valueSchema, ConfluentUtils.valueSubject(topic))
(keyBytes, valueBytes)
}
record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
Expand Down

0 comments on commit 2c82230

Please sign in to comment.