Skip to content

Commit

Permalink
Fix (use AbstractConfluentKafkaAvroSerializer in ConfluentAvroMessage…
Browse files Browse the repository at this point in the history
…Reader)
  • Loading branch information
gskrobisz committed Jun 29, 2021
1 parent 0e46765 commit 811f0b2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter
import io.circe.Json
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer
import org.apache.avro.Schema.Type
import org.apache.avro.io.DecoderFactory
import org.apache.avro.util.Utf8
import org.apache.avro.Schema
import org.apache.kafka.common.errors.SerializationException
import pl.touk.nussknacker.engine.avro.AvroUtils
import pl.touk.nussknacker.engine.avro.schema.DatumReaderWriterMixin
import pl.touk.nussknacker.engine.avro.schema.{DatumReaderWriterMixin, DefaultAvroSchemaEvolution}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.AbstractConfluentKafkaAvroSerializer

import scala.reflect.ClassTag

/**
* @param schemaRegistryClient schema registry client
*/
private[confluent] class ConfluentAvroMessageReader(schemaRegistryClient: SchemaRegistryClient)
extends AbstractKafkaAvroSerializer with DatumReaderWriterMixin {
extends AbstractConfluentKafkaAvroSerializer(new DefaultAvroSchemaEvolution) with DatumReaderWriterMixin {

schemaRegistry = schemaRegistryClient

private val decoderFactory = DecoderFactory.get

def readJson[T: ClassTag](jsonObj: Json, schema: Schema, subject: String): Array[Byte] = {
def readJson[T: ClassTag](jsonObj: Json, schemaId: Int, schema: Schema): Array[Byte] = {
try {
val avroObj = jsonToAvro[T](jsonObj, schema)
serializeImpl(subject, avroObj, new AvroSchema(schema))
val avroSchema = new AvroSchema(schema)
writeData(avroObj, avroSchema.rawSchema(), schemaId)
} catch {
case ex: Exception =>
throw new SerializationException("Error reading from input", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC
}
} else {
val keySchema = record.keySchemaId.map(id => getSchemaById(id)).getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id"))
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchema, ConfluentUtils.keySubject(topic))).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, record.keySchemaId.get, keySchema)).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
}
val valueSchema = getSchemaById(record.valueSchemaId)
val valueBytes = messageReader.readJson[V](value, valueSchema, ConfluentUtils.valueSubject(topic))
val valueBytes = messageReader.readJson[V](value, record.valueSchemaId, valueSchema)
(keyBytes, valueBytes)
}
record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic, Record

import scala.reflect.ClassTag

/**
* Base implementation of KafkaSource factory with Avro schema support. It is based on GenericNodeTransformation to
* - allow key and value type identification based on Schema Registry and
* - allow Context initialization with event's value, key and metadata
* You can provide schemas for both key and value. When useStringForKey = true (see KafkaConfig) the contents of event's key
* are treated as String (this is default scenario).
* Reader schema used in runtime is determined by topic and version.
* Reader schema can be different than schema used by writer (e.g. when writer produces event with new schema), in that case "schema evolution" may be required.
* For SpecificRecord use SpecificRecordKafkaAvroSourceFactory.
* Assumptions:
* 1. Every event that comes in has its key and value schemas registered in Schema Registry.
* 2. Avro payload must include schema id for both Generic and Specific records (to provide "schema evolution" we need to know the exact writers schema).
*
* @param schemaRegistryProvider - provides a set of strategies for serialization and deserialization while event processing and/or testing.
* @tparam K - type of event's key, used to determine if key object is Specific or Generic (for GenericRecords use Any)
* @tparam V - type of event's value, used to determine if value object is Specific or Generic (for GenericRecords use Any)
*/
class KafkaAvroSourceFactory[K:ClassTag, V:ClassTag](val schemaRegistryProvider: SchemaRegistryProvider,
val processObjectDependencies: ProcessObjectDependencies,
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[K, V]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResu
import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData, VariableConstants}
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.{SchemaVersionParamName, TopicParamName}
import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaVersionOption}
import pl.touk.nussknacker.engine.compile.ExpressionCompiler
Expand All @@ -24,7 +24,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator

import java.nio.charset.StandardCharsets
import java.time.{Instant, LocalDateTime, ZoneOffset}
import java.time.{LocalDateTime, ZoneOffset}
import scala.collection.immutable.ListMap

class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvroSourceSpecMixin {
Expand Down Expand Up @@ -62,9 +62,9 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr
test("should read last generated specific record with logical types ") {
val date = LocalDateTime.of(2020, 1, 2, 3, 14, 15)
val decimal = new java.math.BigDecimal("12.34")
val givenObj = GeneratedAvroClassWithLogicalTypesNewSchema(date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, "loremipsum", decimal)
val givenObj = new GeneratedAvroClassWithLogicalTypes("loremipsum", date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, decimal)

roundTripKeyValueObject(avroSourceFactory, useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj)
roundTripKeyValueObject(specificSourceFactory[GeneratedAvroClassWithLogicalTypes], useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj)
}

test("should read generated record in v2") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.avro.{AvroUtils, TestSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.encode.{BestEffortAvroEncoder, ValidationMode}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder}
import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor

Expand Down Expand Up @@ -55,6 +55,7 @@ trait KafkaAvroSourceSpecMixin {
""".stripMargin
)

// ALL schemas, for Generic and Specific records, must be regitered in schema registry
val schemaRegistryMockClient: CSchemaRegistryClient = new MockConfluentSchemaRegistryClientBuilder()
.register(RecordTopic, FullNameV1.schema, 1, isKey = false)
.register(RecordTopic, FullNameV2.schema, 2, isKey = false)
Expand All @@ -65,7 +66,7 @@ trait KafkaAvroSourceSpecMixin {
.register(IntTopicWithKey, IntSchema, 1, isKey = true)
.register(InvalidDefaultsTopic, InvalidDefaultsSchema, 1, isKey = false)
.register(PaymentDateTopic, PaymentDate.schema, 1, isKey = false)
.register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypesNewSchema.schema, 1, isKey = false)
.register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypes.getClassSchema, 1, isKey = false)
.build

val factory: CachedConfluentSchemaRegistryClientFactory = TestSchemaRegistryClientFactory(schemaRegistryMockClient)
Expand Down

0 comments on commit 811f0b2

Please sign in to comment.