diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala index 873f9a9ab4b..f8bb9420496 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala @@ -3,14 +3,14 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter import io.circe.Json import io.confluent.kafka.schemaregistry.avro.AvroSchema import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient -import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer import org.apache.avro.Schema.Type import org.apache.avro.io.DecoderFactory import org.apache.avro.util.Utf8 import org.apache.avro.Schema import org.apache.kafka.common.errors.SerializationException import pl.touk.nussknacker.engine.avro.AvroUtils -import pl.touk.nussknacker.engine.avro.schema.DatumReaderWriterMixin +import pl.touk.nussknacker.engine.avro.schema.{DatumReaderWriterMixin, DefaultAvroSchemaEvolution} +import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.AbstractConfluentKafkaAvroSerializer import scala.reflect.ClassTag @@ -18,16 +18,16 @@ import scala.reflect.ClassTag * @param schemaRegistryClient schema registry client */ private[confluent] class ConfluentAvroMessageReader(schemaRegistryClient: SchemaRegistryClient) - extends AbstractKafkaAvroSerializer with DatumReaderWriterMixin { + extends AbstractConfluentKafkaAvroSerializer(new DefaultAvroSchemaEvolution) with DatumReaderWriterMixin { schemaRegistry = schemaRegistryClient private val decoderFactory = DecoderFactory.get - def readJson[T: ClassTag](jsonObj: Json, schema: Schema, subject: String): Array[Byte] = { + def readJson[T: ClassTag](jsonObj: Json, schemaId: Int, schema: Schema): Array[Byte] = { try { val avroObj = jsonToAvro[T](jsonObj, schema) - serializeImpl(subject, avroObj, new AvroSchema(schema)) + writeData(avroObj, schema, schemaId) } catch { case ex: Exception => throw new SerializationException("Error reading from input", ex) diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala index 64ddeb73c7a..f25c9e593bf 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroToJsonFormatterFactory.scala @@ -80,11 +80,12 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC case None => null } } else { - val keySchema = record.keySchemaId.map(id => getSchemaById(id)).getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id")) - keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchema, ConfluentUtils.keySubject(topic))).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key")) + val keySchemaId = record.keySchemaId.getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id")) + val keySchema = getSchemaById(keySchemaId) + keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchemaId, keySchema)).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key")) } val valueSchema = getSchemaById(record.valueSchemaId) - val valueBytes = messageReader.readJson[V](value, valueSchema, ConfluentUtils.valueSubject(topic)) + val valueBytes = messageReader.readJson[V](value, record.valueSchemaId, valueSchema) (keyBytes, valueBytes) } record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue) diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala index 8f012b2e577..76a3109c122 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala @@ -23,6 +23,23 @@ import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic, Record import scala.reflect.ClassTag +/** + * Base implementation of KafkaSource factory with Avro schema support. It is based on GenericNodeTransformation to + * - allow key and value type identification based on Schema Registry and + * - allow Context initialization with event's value, key and metadata + * You can provide schemas for both key and value. When useStringForKey = true (see KafkaConfig) the contents of event's key + * are treated as String (this is default scenario). + * Reader schema used in runtime is determined by topic and version. + * Reader schema can be different than schema used by writer (e.g. when writer produces event with new schema), in that case "schema evolution" may be required. + * For SpecificRecord use SpecificRecordKafkaAvroSourceFactory. + * Assumptions: + * 1. Every event that comes in has its key and value schemas registered in Schema Registry. + * 2. Avro payload must include schema id for both Generic and Specific records (to provide "schema evolution" we need to know the exact writers schema). + * + * @param schemaRegistryProvider - provides a set of strategies for serialization and deserialization while event processing and/or testing. + * @tparam K - type of event's key, used to determine if key object is Specific or Generic (for GenericRecords use Any) + * @tparam V - type of event's value, used to determine if value object is Specific or Generic (for GenericRecords use Any) + */ class KafkaAvroSourceFactory[K:ClassTag, V:ClassTag](val schemaRegistryProvider: SchemaRegistryProvider, val processObjectDependencies: ProcessObjectDependencies, timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[K, V]]]) diff --git a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroPayloadSourceFactorySpec.scala b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroPayloadSourceFactorySpec.scala index aed2ba0427f..57ff5ed2162 100644 --- a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroPayloadSourceFactorySpec.scala +++ b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroPayloadSourceFactorySpec.scala @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResu import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData, VariableConstants} import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.{SchemaVersionParamName, TopicParamName} import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin -import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, PaymentV1} +import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1} import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaVersionOption} import pl.touk.nussknacker.engine.compile.ExpressionCompiler @@ -24,6 +24,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator import java.nio.charset.StandardCharsets +import java.time.{LocalDateTime, ZoneOffset} import scala.collection.immutable.ListMap class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvroSourceSpecMixin { @@ -52,6 +53,20 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr roundTripKeyValueObject(specificSourceFactory[FullNameV1], useStringForKey = true, RecordTopic, ExistingSchemaVersion(1), null, givenObj) } + test("should read last generated generic record with logical types") { + val givenObj = PaymentDate.record + + roundTripKeyValueObject(avroSourceFactory, useStringForKey = true, PaymentDateTopic, ExistingSchemaVersion(1), "", givenObj) + } + + test("should read last generated specific record with logical types ") { + val date = LocalDateTime.of(2020, 1, 2, 3, 14, 15) + val decimal = new java.math.BigDecimal("12.34") + val givenObj = new GeneratedAvroClassWithLogicalTypes("loremipsum", date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, decimal) + + roundTripKeyValueObject(specificSourceFactory[GeneratedAvroClassWithLogicalTypes], useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj) + } + test("should read generated record in v2") { val givenObj = FullNameV2.createRecord("Jan", "Maria", "Kowalski") diff --git a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceSpecMixin.scala b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceSpecMixin.scala index eb411dfa0ca..1c26b2b9997 100644 --- a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceSpecMixin.scala +++ b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceSpecMixin.scala @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.avro.{AvroUtils, TestSchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.encode.{BestEffortAvroEncoder, ValidationMode} -import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, PaymentV1} +import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, PaymentDate, PaymentV1} import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder} import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor @@ -29,6 +29,8 @@ trait KafkaAvroSourceSpecMixin { val IntTopicWithKey: String = "testAvroIntTopic1WithKey" val IntTopicNoKey: String = "testAvroIntTopic1NoKey" val InvalidDefaultsTopic: String = "testAvroInvalidDefaultsTopic1" + val PaymentDateTopic: String = "testPaymentDateTopic" + val GeneratedWithLogicalTypesTopic: String = "testGeneratedWithLogicalTypesTopic" val IntSchema: Schema = AvroUtils.parseSchema( """{ @@ -53,6 +55,7 @@ trait KafkaAvroSourceSpecMixin { """.stripMargin ) + // ALL schemas, for Generic and Specific records, must be regitered in schema registry val schemaRegistryMockClient: CSchemaRegistryClient = new MockConfluentSchemaRegistryClientBuilder() .register(RecordTopic, FullNameV1.schema, 1, isKey = false) .register(RecordTopic, FullNameV2.schema, 2, isKey = false) @@ -62,6 +65,8 @@ trait KafkaAvroSourceSpecMixin { .register(IntTopicWithKey, IntSchema, 1, isKey = false) .register(IntTopicWithKey, IntSchema, 1, isKey = true) .register(InvalidDefaultsTopic, InvalidDefaultsSchema, 1, isKey = false) + .register(PaymentDateTopic, PaymentDate.schema, 1, isKey = false) + .register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypes.getClassSchema, 1, isKey = false) .build val factory: CachedConfluentSchemaRegistryClientFactory = TestSchemaRegistryClientFactory(schemaRegistryMockClient) diff --git a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TopicSelectionStrategySpec.scala b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TopicSelectionStrategySpec.scala index 202d5ba7f8e..7b185a83a6e 100644 --- a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TopicSelectionStrategySpec.scala +++ b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TopicSelectionStrategySpec.scala @@ -19,7 +19,7 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource test("all topic strategy test") { val strategy = new AllTopicsSelectionStrategy() - strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(Set(RecordTopic, RecordTopicWithKey, IntTopicNoKey, IntTopicWithKey, InvalidDefaultsTopic)) + strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(Set(RecordTopic, RecordTopicWithKey, IntTopicNoKey, IntTopicWithKey, InvalidDefaultsTopic, PaymentDateTopic, GeneratedWithLogicalTypesTopic)) } test("topic filtering strategy test") {