From 9948c8025dcb1b3c7610729efd36d8e6cad28b89 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Mon, 5 Jul 2021 11:45:22 +0200 Subject: [PATCH] Problem with generic/specific record serialization to Array[byte] - approach that doesn't do anything with schema evolution --- .../ConfluentAvroMessageReader.scala | 23 ++++++++----------- .../ConfluentAvroToJsonFormatterFactory.scala | 7 +++--- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala index f8bb9420496..b0bd8bc45dd 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala @@ -3,41 +3,38 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter import io.circe.Json import io.confluent.kafka.schemaregistry.avro.AvroSchema import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer +import org.apache.avro.Schema import org.apache.avro.Schema.Type -import org.apache.avro.io.DecoderFactory +import org.apache.avro.generic.GenericData +import org.apache.avro.io.{DatumReader, DecoderFactory} import org.apache.avro.util.Utf8 -import org.apache.avro.Schema import org.apache.kafka.common.errors.SerializationException -import pl.touk.nussknacker.engine.avro.AvroUtils -import pl.touk.nussknacker.engine.avro.schema.{DatumReaderWriterMixin, DefaultAvroSchemaEvolution} -import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.AbstractConfluentKafkaAvroSerializer - -import scala.reflect.ClassTag /** * @param schemaRegistryClient schema registry client */ private[confluent] class ConfluentAvroMessageReader(schemaRegistryClient: SchemaRegistryClient) - extends AbstractConfluentKafkaAvroSerializer(new DefaultAvroSchemaEvolution) with DatumReaderWriterMixin { + extends AbstractKafkaAvroSerializer { schemaRegistry = schemaRegistryClient private val decoderFactory = DecoderFactory.get - def readJson[T: ClassTag](jsonObj: Json, schemaId: Int, schema: Schema): Array[Byte] = { + def readJson(jsonObj: Json, schema: Schema, subject: String): Array[Byte] = { try { - val avroObj = jsonToAvro[T](jsonObj, schema) - writeData(avroObj, schema, schemaId) + val avroObj = jsonToAvro(jsonObj, schema) + serializeImpl(subject, avroObj, new AvroSchema(schema)) } catch { case ex: Exception => throw new SerializationException("Error reading from input", ex) } } - private def jsonToAvro[T: ClassTag](jsonObj: Json, schema: Schema): AnyRef = { + private def jsonToAvro(jsonObj: Json, schema: Schema): AnyRef = { val jsonString = jsonObj.noSpaces try { - val reader = createDatumReader(schema, schema, useSchemaReflection = false, useSpecificAvroReader = AvroUtils.isSpecificRecord[T]) + val reader: DatumReader[AnyRef] = GenericData.get().createDatumReader(schema).asInstanceOf[DatumReader[AnyRef]] val obj = reader.read(null, decoderFactory.jsonDecoder(schema, jsonString)) if (schema.getType == Type.STRING) obj.asInstanceOf[Utf8].toString diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala index f25c9e593bf..43c6b1b554a 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala @@ -80,12 +80,11 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC case None => null } } else { - val keySchemaId = record.keySchemaId.getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id")) - val keySchema = getSchemaById(keySchemaId) - keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchemaId, keySchema)).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key")) + val keySchema = record.keySchemaId.map(id => getSchemaById(id)).getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id")) + keyOpt.map(keyJson => messageReader.readJson(keyJson, keySchema, ConfluentUtils.keySubject(topic))).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key")) } val valueSchema = getSchemaById(record.valueSchemaId) - val valueBytes = messageReader.readJson[V](value, record.valueSchemaId, valueSchema) + val valueBytes = messageReader.readJson(value, valueSchema, ConfluentUtils.valueSubject(topic)) (keyBytes, valueBytes) } record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)