Skip to content

Commit

Permalink
Fix (use AbstractConfluentKafkaAvroSerializer in ConfluentAvroMessage…
Browse files Browse the repository at this point in the history
…Reader)
  • Loading branch information
gskrobisz committed Jun 29, 2021
1 parent 0e46765 commit de8a39b
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter
import io.circe.Json
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer
import org.apache.avro.Schema.Type
import org.apache.avro.io.DecoderFactory
import org.apache.avro.util.Utf8
import org.apache.avro.Schema
import org.apache.kafka.common.errors.SerializationException
import pl.touk.nussknacker.engine.avro.AvroUtils
import pl.touk.nussknacker.engine.avro.schema.DatumReaderWriterMixin
import pl.touk.nussknacker.engine.avro.schema.{DatumReaderWriterMixin, DefaultAvroSchemaEvolution}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.AbstractConfluentKafkaAvroSerializer

import scala.reflect.ClassTag

/**
* @param schemaRegistryClient schema registry client
*/
private[confluent] class ConfluentAvroMessageReader(schemaRegistryClient: SchemaRegistryClient)
extends AbstractKafkaAvroSerializer with DatumReaderWriterMixin {
extends AbstractConfluentKafkaAvroSerializer(new DefaultAvroSchemaEvolution) with DatumReaderWriterMixin {

schemaRegistry = schemaRegistryClient

private val decoderFactory = DecoderFactory.get

def readJson[T: ClassTag](jsonObj: Json, schema: Schema, subject: String): Array[Byte] = {
def readJson[T: ClassTag](jsonObj: Json, schemaId: Int, schema: Schema): Array[Byte] = {
try {
val avroObj = jsonToAvro[T](jsonObj, schema)
serializeImpl(subject, avroObj, new AvroSchema(schema))
val avroSchema = new AvroSchema(schema)
writeData(avroObj, avroSchema.rawSchema(), schemaId)
} catch {
case ex: Exception =>
throw new SerializationException("Error reading from input", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC
}
} else {
val keySchema = record.keySchemaId.map(id => getSchemaById(id)).getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id"))
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchema, ConfluentUtils.keySubject(topic))).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, record.keySchemaId.get, keySchema)).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
}
val valueSchema = getSchemaById(record.valueSchemaId)
val valueBytes = messageReader.readJson[V](value, valueSchema, ConfluentUtils.valueSubject(topic))
val valueBytes = messageReader.readJson[V](value, record.valueSchemaId, valueSchema)
(keyBytes, valueBytes)
}
record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.confluent.kafka.serializers.{AbstractKafkaAvroSerializer, AbstractKafk
import org.apache.avro.Schema
import org.apache.avro.generic.GenericContainer
import org.apache.avro.io.{Encoder, EncoderFactory}
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.errors.SerializationException
import pl.touk.nussknacker.engine.avro.schema.{AvroSchemaEvolution, DatumReaderWriterMixin}

Expand Down Expand Up @@ -38,6 +39,10 @@ class AbstractConfluentKafkaAvroSerializer(avroSchemaEvolution: AvroSchemaEvolut

try {
val schemaId: Int = autoRegisterSchemaIfNeeded(topic, data, isKey, avroSchema)
// val schemaId: Integer = data match {
// case _: SpecificRecord => null // SpecificRecord's schema does not have registry id
// case _ => autoRegisterSchemaIfNeeded(topic, data, isKey, avroSchema)
// }
writeData(record, avroSchema.rawSchema(), schemaId)
} catch {
case exc@(_: RuntimeException | _: IOException) =>
Expand All @@ -46,7 +51,7 @@ class AbstractConfluentKafkaAvroSerializer(avroSchemaEvolution: AvroSchemaEvolut
}
}

private def autoRegisterSchemaIfNeeded(topic: String, data: Any, isKey: Boolean, avroSchema: AvroSchema) = {
private def autoRegisterSchemaIfNeeded(topic: String, data: Any, isKey: Boolean, avroSchema: AvroSchema): Integer = {
try {
val subject = getSubjectName(topic, isKey, data, avroSchema)
if (this.autoRegisterSchema) {
Expand All @@ -60,7 +65,7 @@ class AbstractConfluentKafkaAvroSerializer(avroSchemaEvolution: AvroSchemaEvolut
}
}

protected def writeData(data: Any, avroSchema: Schema, schemaId: Int): Array[Byte] =
protected def writeData(data: Any, avroSchema: Schema, schemaId: Integer): Array[Byte] =
Using.resource(new ByteArrayOutputStream) { out =>

writeHeader(data, avroSchema, schemaId, out)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ trait KafkaAvroSpecMixin extends FunSuite with KafkaWithSchemaRegistryOperations

protected def readLastMessageAndVerify(sourceFactory: KafkaAvroSourceFactory[Any, Any], topic: String, versionOption: SchemaVersionOption, givenKey: Any, givenValue: Any):
Validated[NonEmptyList[ProcessCompilationError], Assertion] = {
readLastMessage(sourceFactory, topic, versionOption).map(deserializedObj => {
deserializedObj.key() shouldEqual givenKey
deserializedObj.value() shouldEqual givenValue
})
}

protected def readLastMessage(sourceFactory: KafkaAvroSourceFactory[Any, Any], topic: String, versionOption: SchemaVersionOption):
Validated[NonEmptyList[ProcessCompilationError], ConsumerRecord[Any, Any]] = {
val parameterValues = sourceFactory match {
case _ : SpecificRecordKafkaAvroSourceFactory[_] => Map(KafkaAvroBaseTransformer.TopicParamName -> topic)
case _ => Map(KafkaAvroBaseTransformer.TopicParamName -> topic, KafkaAvroBaseTransformer.SchemaVersionParamName -> versionOptionToString(versionOption))
Expand All @@ -225,10 +233,7 @@ trait KafkaAvroSpecMixin extends FunSuite with KafkaWithSchemaRegistryOperations
.map(source => {
val bytes = source.generateTestData(1)
info("test object: " + new String(bytes, StandardCharsets.UTF_8))
val deserializedObj = source.testDataParser.parseTestData(bytes).head.asInstanceOf[ConsumerRecord[Any, Any]]

deserializedObj.key() shouldEqual givenKey
deserializedObj.value() shouldEqual givenValue
source.testDataParser.parseTestData(bytes).head.asInstanceOf[ConsumerRecord[Any, Any]]
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResu
import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData, VariableConstants}
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.{SchemaVersionParamName, TopicParamName}
import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaVersionOption}
import pl.touk.nussknacker.engine.compile.ExpressionCompiler
Expand All @@ -24,7 +24,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator

import java.nio.charset.StandardCharsets
import java.time.{Instant, LocalDateTime, ZoneOffset}
import java.time.{LocalDateTime, ZoneOffset}
import scala.collection.immutable.ListMap

class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvroSourceSpecMixin {
Expand Down Expand Up @@ -62,9 +62,9 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr
test("should read last generated specific record with logical types ") {
val date = LocalDateTime.of(2020, 1, 2, 3, 14, 15)
val decimal = new java.math.BigDecimal("12.34")
val givenObj = GeneratedAvroClassWithLogicalTypesNewSchema(date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, "loremipsum", decimal)
val givenObj = new GeneratedAvroClassWithLogicalTypes("loremipsum", date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, decimal)

roundTripKeyValueObject(avroSourceFactory, useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj)
roundTripKeyValueObject(specificSourceFactory[GeneratedAvroClassWithLogicalTypes], useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj)
}

test("should read generated record in v2") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.avro.{AvroUtils, TestSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.encode.{BestEffortAvroEncoder, ValidationMode}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder}
import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor

Expand Down Expand Up @@ -65,7 +65,7 @@ trait KafkaAvroSourceSpecMixin {
.register(IntTopicWithKey, IntSchema, 1, isKey = true)
.register(InvalidDefaultsTopic, InvalidDefaultsSchema, 1, isKey = false)
.register(PaymentDateTopic, PaymentDate.schema, 1, isKey = false)
.register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypesNewSchema.schema, 1, isKey = false)
//??? .register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypes.getClassSchema, 1, isKey = false)
.build

val factory: CachedConfluentSchemaRegistryClientFactory = TestSchemaRegistryClientFactory(schemaRegistryMockClient)
Expand Down

0 comments on commit de8a39b

Please sign in to comment.