From b7c042461b1b5212f04945f46796d4dbbb9115a9 Mon Sep 17 00:00:00 2001 From: gskrobisz Date: Tue, 13 Apr 2021 17:19:17 +0200 Subject: [PATCH] =?UTF-8?q?ClassTag=20is=20provided=20in=20params=20in=20d?= =?UTF-8?q?eserialization=20schema=20factory=20=E2=80=A6=20(#1499)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ClassTag is provided in params so one deserialization schema factory will work for multiple deserialization schemas * Code review fixes * Code review fixes - code formatting * Migration guide. * Add AvroSchemaDeterminer with fallback to fixed String schema. --- docs/Changelog.md | 1 + docs/MigrationGuide.md | 4 + .../engine/avro/AvroSchemaDeterminer.scala | 6 ++ .../avro/KafkaAvroBaseTransformer.scala | 17 ++-- .../BasedOnVersionAvroSchemaDeterminer.scala | 29 +++++- .../confluent/ConfluentUtils.scala | 6 +- ...afkaAvroDeserializationSchemaFactory.scala | 19 ++-- .../ConfluentKafkaAvroDeserializer.scala | 4 +- ...afkaAvroDeserializationSchemaFactory.scala | 92 ++++++++++--------- .../avro/sink/KafkaAvroSinkFactory.scala | 4 +- .../sink/KafkaAvroSinkFactoryWithEditor.scala | 4 +- .../source/BaseKafkaAvroSourceFactory.scala | 24 +++-- .../avro/source/KafkaAvroSourceFactory.scala | 34 +++++-- ...SpecificRecordKafkaAvroSourceFactory.scala | 14 ++- ...onfluentKafkaAvroDeserializationSpec.scala | 6 +- ...ueKafkaAvroDeserializerSchemaFactory.scala | 15 +-- 16 files changed, 177 insertions(+), 102 deletions(-) diff --git a/docs/Changelog.md b/docs/Changelog.md index cda2b2c45c2..97c4edf1ca4 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -35,6 +35,7 @@ Nussknacker versions * [#1405](https://github.com/TouK/nussknacker/pull/1405) 'KafkaAvroSinkFactoryWithEditor' for more user-friendly Avro message definition. * [#1510](https://github.com/TouK/nussknacker/pull/1510) `FlinkSource` API allows to create stream of `Context` (FlinkSource API and test support API refactoring). * [#1497](https://github.com/TouK/nussknacker/pull/1497) Initial support for multiple (named) schedules in `PeriodicProcessManager` +* [#1499](https://github.com/TouK/nussknacker/pull/1499) ClassTag is provided in params in avro key-value deserialization schema factory: `KafkaAvroKeyValueDeserializationSchemaFactory` 0.3.1 (not released yet) ------------------------ diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index d707b937259..841c626f8c0 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -49,6 +49,10 @@ To see biggest differences please consult the [changelog](Changelog.md). - For kafka sources `RecordFormatter` parses raw test data to `ConsumerRecord` which fits into deserializer (instead of `ProducerRecord` that required another transformation). - Definitions of names of common `Context` variables are moved to `VariableConstants` (instead of `Interpreter`). * [#1497](https://github.com/TouK/nussknacker/pull/1497) Changes in `PeriodicProcessManager`, change `PeriodicProperty` to `ScheduleProperty` +* [#1499](https://github.com/TouK/nussknacker/pull/1499) + - trait `KafkaAvroDeserializationSchemaFactory` uses both key and value ClassTags and schemas (instead of value-only), check the order of parameters. + - ClassTag is provided in params in avro key-value deserialization schema factory: `KafkaAvroKeyValueDeserializationSchemaFactory` + - `BaseKafkaAvroSourceFactory` is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change) ## In version 0.3.0 diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/AvroSchemaDeterminer.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/AvroSchemaDeterminer.scala index 707011afbb7..ddfac81adc2 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/AvroSchemaDeterminer.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/AvroSchemaDeterminer.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.avro import cats.data.Validated +import cats.data.Validated.Valid import org.apache.avro.Schema import org.apache.flink.formats.avro.typeutils.NkSerializableAvroSchema @@ -34,3 +35,8 @@ object RuntimeSchemaData { } class SchemaDeterminerError(message: String, cause: Throwable) extends RuntimeException(message, cause) + +case object FixedStringSchemaDeterminer extends AvroSchemaDeterminer { + override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = Valid(RuntimeSchemaData(Schema.create(Schema.Type.STRING), None)) + override def toRuntimeSchema(schemaUsedInTyping: RuntimeSchemaData): Option[RuntimeSchemaData] = None +} \ No newline at end of file diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/KafkaAvroBaseTransformer.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/KafkaAvroBaseTransformer.scala index 22a74f72ccd..ef19373a410 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/KafkaAvroBaseTransformer.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/KafkaAvroBaseTransformer.scala @@ -2,16 +2,15 @@ package pl.touk.nussknacker.engine.avro import cats.data.Validated.{Invalid, Valid} import cats.data.Writer -import pl.touk.nussknacker.engine.api.MetaData +import org.apache.avro.Schema import pl.touk.nussknacker.engine.api.context.ProcessCompilationError import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CustomNodeError, NodeId} import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputGenericNodeTransformation, TypedNodeDependencyValue} -import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, NodeDependency, Parameter, TypedNodeDependency} +import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, Parameter} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.TopicParamName -import pl.touk.nussknacker.engine.avro.schemaregistry.{BasedOnVersionAvroSchemaDeterminer, SchemaRegistryClient, SchemaRegistryProvider, SchemaVersionOption} -import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactoryWithEditor.paramsDeterminedAfterSchema +import pl.touk.nussknacker.engine.avro.schemaregistry._ import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, PreparedKafkaTopic} import scala.reflect.ClassTag @@ -86,8 +85,14 @@ trait KafkaAvroBaseTransformer[T] extends SingleInputGenericNodeTransformation[T protected def parseVersionOption(versionOptionName: String): SchemaVersionOption = SchemaVersionOption.byName(versionOptionName) - protected def prepareSchemaDeterminer(preparedTopic: PreparedKafkaTopic, version: SchemaVersionOption): AvroSchemaDeterminer = { - new BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, version) + protected def prepareValueSchemaDeterminer(preparedTopic: PreparedKafkaTopic, version: SchemaVersionOption): AvroSchemaDeterminer = { + new BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, version, isKey = false) + } + + //TODO: add schema versioning for key schemas + protected def prepareKeySchemaDeterminer(preparedTopic: PreparedKafkaTopic): AvroSchemaDeterminer = { + val fallbackSchema = Schema.create(Schema.Type.STRING) + new BasedOnVersionWithFallbackAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, LatestSchemaVersion, isKey = true, fallbackSchema) } protected def topicParamStep(implicit nodeId: NodeId): NodeTransformationDefinition = { diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala index 557cb906400..e59c261c818 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala @@ -1,11 +1,14 @@ package pl.touk.nussknacker.engine.avro.schemaregistry import cats.data.Validated -import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, SchemaDeterminerError, RuntimeSchemaData} +import cats.data.Validated.Valid +import org.apache.avro.Schema +import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError} class BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryClient, topic: String, - versionOption: SchemaVersionOption) extends AvroSchemaDeterminer { + versionOption: SchemaVersionOption, + isKey: Boolean) extends AvroSchemaDeterminer { override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = { val version = versionOption match { @@ -13,9 +16,29 @@ class BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryCli case LatestSchemaVersion => None } schemaRegistryClient - .getFreshSchema(topic, version, isKey = false) + .getFreshSchema(topic, version, isKey = isKey) .leftMap(err => new SchemaDeterminerError(s"Fetching schema error for topic: $topic, version: $versionOption", err)) .map(withMetadata => RuntimeSchemaData(withMetadata.schema, Some(withMetadata.id))) } } + +class BasedOnVersionWithFallbackAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryClient, + topic: String, + versionOption: SchemaVersionOption, + isKey: Boolean, + schema: Schema + ) extends AvroSchemaDeterminer { + + override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = { + val version = versionOption match { + case ExistingSchemaVersion(v) => Some(v) + case LatestSchemaVersion => None + } + schemaRegistryClient + .getFreshSchema(topic, version, isKey = isKey) + .map(withMetadata => RuntimeSchemaData(withMetadata.schema, Some(withMetadata.id))) + .orElse(Valid(RuntimeSchemaData(schema, None))) + } + +} diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentUtils.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentUtils.scala index fd7bbd98604..3c98d43bd43 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentUtils.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentUtils.scala @@ -45,7 +45,11 @@ object ConfluentUtils extends LazyLogging { def parsePayloadToByteBuffer(payload: Array[Byte]): Validated[IllegalArgumentException, ByteBuffer] = { val buffer = ByteBuffer.wrap(payload) - if (buffer.get != MagicByte) + if (buffer.array().isEmpty) + // Here parsed payload is an empty buffer. In that case buffer.get below raises "java.nio.BufferUnderflowException". + // This usually happens when the content of key or value is null. + Validated.invalid(new IllegalArgumentException("Buffer is empty")) + else if (buffer.get != MagicByte) Validated.invalid(new IllegalArgumentException("Unknown magic byte!")) else Validated.valid(buffer) diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala index 4439e8c76c1..44a78f5d0f4 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala @@ -1,12 +1,9 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization import com.typesafe.scalalogging.LazyLogging -import org.apache.avro.specific.SpecificRecordBase import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.formats.avro.typeutils.{LogicalTypesAvroTypeInfo, LogicalTypesGenericRecordAvroTypeInfo, LogicalTypesGenericRecordWithSchemaIdAvroTypeInfo} import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.serialization.Deserializer -import pl.touk.nussknacker.engine.avro.kryo.KryoGenericRecordSchemaIdSerializationSupport import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentUtils import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory import pl.touk.nussknacker.engine.avro.serialization.{KafkaAvroKeyValueDeserializationSchemaFactory, KafkaAvroValueDeserializationSchemaFactory} @@ -51,16 +48,16 @@ class ConfluentKafkaAvroDeserializationSchemaFactory(schemaRegistryClientFactory abstract class ConfluentKeyValueKafkaAvroDeserializationFactory(schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory) extends KafkaAvroKeyValueDeserializationSchemaFactory with ConfluentKafkaAvroDeserializerFactory { - override protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[K] = - createDeserializer[K](schemaRegistryClientFactory, kafkaConfig, None, isKey = true)(keyClassTag) + override protected def createKeyDeserializer[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[K] = + createDeserializer[K](schemaRegistryClientFactory, kafkaConfig, schemaDataOpt, isKey = true) - override protected def createKeyTypeInfo(kafkaConfig: KafkaConfig): TypeInformation[K] = - createTypeInfo[K](kafkaConfig, None)(keyClassTag) + override protected def createKeyTypeInfo[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[K] = + createTypeInfo[K](kafkaConfig, schemaDataOpt) - override protected def createValueDeserializer(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V] = - createDeserializer[V](schemaRegistryClientFactory, kafkaConfig, schemaDataOpt, isKey = false)(valueClassTag) + override protected def createValueDeserializer[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V] = + createDeserializer[V](schemaRegistryClientFactory, kafkaConfig, schemaDataOpt, isKey = false) - override protected def createValueTypeInfo(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V] = - createTypeInfo[V](kafkaConfig, schemaDataOpt)(valueClassTag) + override protected def createValueTypeInfo[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V] = + createTypeInfo[V](kafkaConfig, schemaDataOpt) } diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializer.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializer.scala index e64c4c24918..ea952b44ec4 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializer.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializer.scala @@ -34,8 +34,8 @@ class ConfluentKafkaAvroDeserializer[T](kafkaConfig: KafkaConfig, schemaData: Ru KryoGenericRecordSchemaIdSerializationSupport.schemaIdSerializationEnabled(kafkaConfig) override def deserialize(topic: String, data: Array[Byte]): T = { - val record = deserialize(topic, isKey, data, schemaData) - record.asInstanceOf[T] + val deserializedData = deserialize(topic, isKey, data, schemaData) + deserializedData.asInstanceOf[T] } override def close(): Unit = {} diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/serialization/KafkaAvroDeserializationSchemaFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/serialization/KafkaAvroDeserializationSchemaFactory.scala index 40bcde13032..0de700cae3b 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/serialization/KafkaAvroDeserializationSchemaFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/serialization/KafkaAvroDeserializationSchemaFactory.scala @@ -1,5 +1,7 @@ package pl.touk.nussknacker.engine.avro.serialization +import java.nio.charset.StandardCharsets + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema import org.apache.kafka.clients.consumer.ConsumerRecord @@ -16,14 +18,20 @@ import scala.reflect._ trait KafkaAvroDeserializationSchemaFactory extends Serializable { /** - * Prepare Flink's KafkaDeserializationSchema based on provided information. - * @param schemaDataOpt Schema to which will be used as a reader schema. In case of None, will be used the same schema as writer schema. - * @param kafkaConfig Configuration of integration with Kafka - * @tparam T Type that should be produced by deserialization schema. It is important parameter, because factory can - * use other deserialization strategy base on it or provide different TypeInformation - * @return KafkaDeserializationSchema - */ - def create[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[T] + * Prepare Flink's KafkaDeserializationSchema based on provided information. + * + * @param kafkaConfig Configuration of integration with Kafka. + * @param keySchemaDataOpt Schema which will be used as a key reader schema. + * @param valueSchemaDataOpt Schema which will be used as a value reader schema. In case of None, writer schema will be used. + * @tparam K Type that should be produced by key deserialization schema. + * @tparam V Type that should be produced by value deserialization schema. It is important parameter, because factory can + * use other deserialization strategy base on it or provide different TypeInformation + * @return KafkaDeserializationSchema + */ + def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig, + keySchemaDataOpt: Option[RuntimeSchemaData] = None, + valueSchemaDataOpt: Option[RuntimeSchemaData] + ): KafkaDeserializationSchema[Any] } @@ -38,20 +46,24 @@ abstract class KafkaAvroValueDeserializationSchemaFactory protected def createValueTypeInfo[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[T] - override def create[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[T] = { - new KafkaDeserializationSchema[T] { + override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig, + keySchemaDataOpt: Option[RuntimeSchemaData] = None, + valueSchemaDataOpt: Option[RuntimeSchemaData] + ): KafkaDeserializationSchema[Any] = { + new KafkaDeserializationSchema[V] { @transient - private lazy val deserializer = createValueDeserializer[T](schemaDataOpt, kafkaConfig) + private lazy val deserializer = createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig) - override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): T = { + override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): V = { val value = deserializer.deserialize(consumerRecord.topic(), consumerRecord.headers(), consumerRecord.value()) value } - override def isEndOfStream(nextElement: T): Boolean = false + override def isEndOfStream(nextElement: V): Boolean = false - override def getProducedType: TypeInformation[T] = createValueTypeInfo(schemaDataOpt, kafkaConfig) + override def getProducedType: TypeInformation[V] = createValueTypeInfo(valueSchemaDataOpt, kafkaConfig) } + .asInstanceOf[KafkaDeserializationSchema[Any]] } } @@ -64,53 +76,47 @@ abstract class KafkaAvroValueDeserializationSchemaFactory abstract class KafkaAvroKeyValueDeserializationSchemaFactory extends KafkaAvroDeserializationSchemaFactory { - protected type K - - protected type V - protected type O - // TODO Make this provided in params so one deserialization schema factory will work for multiple deserialization schemas - protected def keyClassTag: ClassTag[K] + protected def createKeyDeserializer[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[K] - protected def valueClassTag: ClassTag[V] + protected def createKeyTypeInfo[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[K] - protected def objectClassTag: ClassTag[O] + protected def createValueDeserializer[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V] - // TODO We currently not support schema evolution for keys - protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[K] + protected def createValueTypeInfo[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V] - protected def createKeyTypeInfo(kafkaConfig: KafkaConfig): TypeInformation[K] + protected def createObject[K: ClassTag, V: ClassTag](key: K, value: V, topic: String): O - protected def createValueDeserializer(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V] + protected def createObjectTypeInformation[K: ClassTag, V: ClassTag](keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O] - protected def createValueTypeInfo(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V] + override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig, + keySchemaDataOpt: Option[RuntimeSchemaData] = None, + valueSchemaDataOpt: Option[RuntimeSchemaData] + ): KafkaDeserializationSchema[Any] = { - protected def createObject(key: K, value: V, topic: String): O - - protected def createObjectTypeInformation(keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O] - - override def create[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[T] = { - if (!classTag[T].runtimeClass.isAssignableFrom(objectClassTag.runtimeClass)) { - throw new IllegalArgumentException("Illegal input class: " + classTag[T].runtimeClass) - } - new KafkaDeserializationSchema[T] { + new KafkaDeserializationSchema[O] { @transient - private lazy val keyDeserializer = createKeyDeserializer(kafkaConfig) + private lazy val keyDeserializer = createKeyDeserializer[K](keySchemaDataOpt, kafkaConfig) @transient - private lazy val valueDeserializer = createValueDeserializer(schemaDataOpt, kafkaConfig) + private lazy val valueDeserializer = createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig) - override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): T = { + override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): O = { val key = keyDeserializer.deserialize(consumerRecord.topic(), consumerRecord.key()) val value = valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value()) - val obj = createObject(key, value, consumerRecord.topic()) - obj.asInstanceOf[T] + val obj = createObject[K, V](key, value, consumerRecord.topic()) + obj.asInstanceOf[O] } - override def isEndOfStream(nextElement: T): Boolean = false + override def isEndOfStream(nextElement: O): Boolean = false - override def getProducedType: TypeInformation[T] = createObjectTypeInformation(createKeyTypeInfo(kafkaConfig), createValueTypeInfo(schemaDataOpt, kafkaConfig)).asInstanceOf[TypeInformation[T]] + override def getProducedType: TypeInformation[O] = + createObjectTypeInformation[K, V]( + createKeyTypeInfo[K](keySchemaDataOpt, kafkaConfig), + createValueTypeInfo[V](valueSchemaDataOpt, kafkaConfig) + ) } + .asInstanceOf[KafkaDeserializationSchema[Any]] } } diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala index 6be3f3a154c..eda8b13173b 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala @@ -42,7 +42,7 @@ class KafkaAvroSinkFactory(val schemaRegistryProvider: SchemaRegistryProvider, v //we cast here, since null will not be matched in case... val preparedTopic = prepareTopic(topic) val versionOption = parseVersionOption(version) - val schemaDeterminer = prepareSchemaDeterminer(preparedTopic, versionOption) + val schemaDeterminer = prepareValueSchemaDeterminer(preparedTopic, versionOption) val determinedSchema = schemaDeterminer.determineSchemaUsedInTyping .leftMap(SchemaDeterminerErrorHandler.handleSchemaRegistryError) .leftMap(NonEmptyList.one) @@ -76,7 +76,7 @@ class KafkaAvroSinkFactory(val schemaRegistryProvider: SchemaRegistryProvider, v val validationMode = extractValidationMode(params(KafkaAvroBaseTransformer.SinkValidationModeParameterName).asInstanceOf[String]) createSink(preparedTopic, versionOption, key, value, - kafkaConfig, schemaRegistryProvider.serializationSchemaFactory, prepareSchemaDeterminer(preparedTopic, versionOption), validationMode)( + kafkaConfig, schemaRegistryProvider.serializationSchemaFactory, prepareValueSchemaDeterminer(preparedTopic, versionOption), validationMode)( typedDependency[MetaData](dependencies), typedDependency[NodeId](dependencies)) } diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala index d7a7a9fab86..a1c9cac6c8f 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala @@ -42,7 +42,7 @@ class KafkaAvroSinkFactoryWithEditor(val schemaRegistryProvider: SchemaRegistryP ) => val preparedTopic = prepareTopic(topic) val versionOption = parseVersionOption(version) - val schemaDeterminer = prepareSchemaDeterminer(preparedTopic, versionOption) + val schemaDeterminer = prepareValueSchemaDeterminer(preparedTopic, versionOption) val determinedSchema = schemaDeterminer .determineSchemaUsedInTyping .leftMap(SchemaDeterminerErrorHandler.handleSchemaRegistryError(_)) @@ -81,7 +81,7 @@ class KafkaAvroSinkFactoryWithEditor(val schemaRegistryProvider: SchemaRegistryP val processMetaData = typedDependency[NodeId](dependencies) val clientId = s"${processMetaData.id}-${preparedTopic.prepared}" - val schemaDeterminer = prepareSchemaDeterminer(preparedTopic, versionOption) + val schemaDeterminer = prepareValueSchemaDeterminer(preparedTopic, versionOption) val schemaData = schemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException) val schemaUsedInRuntime = schemaDeterminer.toRuntimeSchema(schemaData) diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/BaseKafkaAvroSourceFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/BaseKafkaAvroSourceFactory.scala index f4915ce1e95..f12c46e61aa 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/BaseKafkaAvroSourceFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/BaseKafkaAvroSourceFactory.scala @@ -1,8 +1,8 @@ package pl.touk.nussknacker.engine.avro.source +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema import pl.touk.nussknacker.engine.api.MetaData import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId -import pl.touk.nussknacker.engine.api.test.TestParsingUtils import pl.touk.nussknacker.engine.api.typed.{ReturningType, typing} import pl.touk.nussknacker.engine.avro.serialization.KafkaAvroDeserializationSchemaFactory import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor @@ -24,29 +24,39 @@ abstract class BaseKafkaAvroSourceFactory[T: ClassTag](timestampAssigner: Option kafkaConfig: KafkaConfig, deserializationSchemaFactory: KafkaAvroDeserializationSchemaFactory, createRecordFormatter: RecordFormatter, - schemaDeterminer: AvroSchemaDeterminer, + keySchemaDeterminer: AvroSchemaDeterminer, + valueSchemaDeterminer: AvroSchemaDeterminer, returnGenericAvroType: Boolean) (implicit processMetaData: MetaData, nodeId: NodeId): KafkaSource[T] = { - val schemaData = schemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException) - val schemaUsedInRuntime = schemaDeterminer.toRuntimeSchema(schemaData) + // key schema + val keySchemaData = keySchemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException) + val keySchemaDataUsedInRuntime = keySchemaDeterminer.toRuntimeSchema(keySchemaData) + + // value schema + val valueSchemaData = valueSchemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException) + val valueSchemaUsedInRuntime = valueSchemaDeterminer.toRuntimeSchema(valueSchemaData) + + // prepare KafkaDeserializationSchema based on key and value schema + // TODO: add key-value deserialization as default scenario: create[K, V] + val deserializationSchema = deserializationSchemaFactory.create[Any, T](kafkaConfig, keySchemaDataUsedInRuntime, valueSchemaUsedInRuntime).asInstanceOf[KafkaDeserializationSchema[T]] if (returnGenericAvroType) { new KafkaSource( List(preparedTopic), kafkaConfig, - deserializationSchemaFactory.create[T](schemaUsedInRuntime, kafkaConfig), + deserializationSchema, assignerToUse(kafkaConfig), createRecordFormatter ) with ReturningType { - override def returnType: typing.TypingResult = AvroSchemaTypeDefinitionExtractor.typeDefinition(schemaData.schema) + override def returnType: typing.TypingResult = AvroSchemaTypeDefinitionExtractor.typeDefinition(valueSchemaData.schema) } } else { new KafkaSource( List(preparedTopic), kafkaConfig, - deserializationSchemaFactory.create[T](schemaUsedInRuntime, kafkaConfig), + deserializationSchema, assignerToUse(kafkaConfig), createRecordFormatter ) diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala index 7272587e69a..69854b17281 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala @@ -17,6 +17,7 @@ import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermar import scala.reflect.ClassTag +//TODO: add key-value as default deserailization scenario class KafkaAvroSourceFactory[T:ClassTag](val schemaRegistryProvider: SchemaRegistryProvider, val processObjectDependencies: ProcessObjectDependencies, timestampAssigner: Option[TimestampWatermarkHandler[T]]) @@ -29,21 +30,28 @@ class KafkaAvroSourceFactory[T:ClassTag](val schemaRegistryProvider: SchemaRegis //we do casting here and not in case, as version can be null... val preparedTopic = prepareTopic(topic) val versionOption = parseVersionOption(version) - val schemaDeterminer = prepareSchemaDeterminer(preparedTopic, versionOption) - val validType = schemaDeterminer.determineSchemaUsedInTyping.map(schemaData => AvroSchemaTypeDefinitionExtractor.typeDefinition(schemaData.schema)) - val finalCtxValue = finalCtx(context, dependencies, validType.getOrElse(Unknown)) - val finalErrors = validType.swap.map(error => CustomNodeError(error.getMessage, Some(SchemaVersionParamName))).toList + + // key schema + // TODO: add key schema versioning + val keySchemaDeterminer = prepareKeySchemaDeterminer(preparedTopic) + val keyValidType = keySchemaDeterminer.determineSchemaUsedInTyping.map(schemaData => AvroSchemaTypeDefinitionExtractor.typeDefinition(schemaData.schema)) + // value schema + val valueSchemaDeterminer = prepareValueSchemaDeterminer(preparedTopic, versionOption) + val valueValidType = valueSchemaDeterminer.determineSchemaUsedInTyping.map(schemaData => AvroSchemaTypeDefinitionExtractor.typeDefinition(schemaData.schema)) + + val finalCtxValue = finalCtx(context, dependencies, keyValidType.getOrElse(Unknown), valueValidType.getOrElse(Unknown)) + val finalErrors = valueValidType.swap.map(error => CustomNodeError(error.getMessage, Some(SchemaVersionParamName))).toList FinalResults(finalCtxValue, finalErrors) //edge case - for some reason Topic/Version is not defined case TransformationStep((TopicParamName, _) :: (SchemaVersionParamName, _) ::Nil, _) => - FinalResults(finalCtx(context, dependencies, Unknown), Nil) + FinalResults(finalCtx(context, dependencies, Unknown, Unknown), Nil) } override def paramsDeterminedAfterSchema: List[Parameter] = Nil - private def finalCtx(context: ValidationContext, dependencies: List[NodeDependencyValue], result: typing.TypingResult)(implicit nodeId: NodeId): ValidationContext = { - context.withVariable(variableName(dependencies), result, None).getOrElse(context) + private def finalCtx(context: ValidationContext, dependencies: List[NodeDependencyValue], keyResult: typing.TypingResult, valueResult: typing.TypingResult)(implicit nodeId: NodeId): ValidationContext = { + context.withVariable(variableName(dependencies), valueResult, None).getOrElse(context) } private def variableName(dependencies: List[NodeDependencyValue]) = { @@ -55,9 +63,15 @@ class KafkaAvroSourceFactory[T:ClassTag](val schemaRegistryProvider: SchemaRegis override def implementation(params: Map[String, Any], dependencies: List[NodeDependencyValue], finalState: Option[State]): FlinkSource[T] = { val preparedTopic = extractPreparedTopic(params) val version = extractVersionOption(params) - createSource(preparedTopic, kafkaConfig, schemaRegistryProvider.deserializationSchemaFactory, schemaRegistryProvider.recordFormatter, - prepareSchemaDeterminer(preparedTopic, version), returnGenericAvroType = true)( - typedDependency[MetaData](dependencies), typedDependency[NodeId](dependencies)) + createSource( + preparedTopic, + kafkaConfig, + schemaRegistryProvider.deserializationSchemaFactory, + schemaRegistryProvider.recordFormatter, + prepareKeySchemaDeterminer(preparedTopic), + prepareValueSchemaDeterminer(preparedTopic, version), + returnGenericAvroType = true + )(typedDependency[MetaData](dependencies), typedDependency[NodeId](dependencies)) } override def nodeDependencies: List[NodeDependency] = List(TypedNodeDependency(classOf[MetaData]), diff --git a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala index 5dc6d69c18f..2c072fc84b4 100644 --- a/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala +++ b/engine/flink/avro-util/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala @@ -6,6 +6,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId import pl.touk.nussknacker.engine.api.editor.{SimpleEditor, SimpleEditorType} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.api.{MetaData, MethodToInvoke, ParamName} +import pl.touk.nussknacker.engine.avro.FixedStringSchemaDeterminer import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.TopicParamName import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaRegistryProvider, SpecificRecordEmbeddedSchemaDeterminer} import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler @@ -27,8 +28,17 @@ class SpecificRecordKafkaAvroSourceFactory[T <: SpecificRecord: ClassTag](schema (implicit processMetaData: MetaData, nodeId: NodeId): KafkaSource[T] = { val kafkaConfig = KafkaConfig.parseProcessObjectDependencies(processObjectDependencies) val preparedTopic = KafkaUtils.prepareKafkaTopic(topic, processObjectDependencies) - val schemaDeterminer = new SpecificRecordEmbeddedSchemaDeterminer(classTag[T].runtimeClass.asInstanceOf[Class[_ <: SpecificRecord]]) - createSource(preparedTopic, kafkaConfig, schemaRegistryProvider.deserializationSchemaFactory, schemaRegistryProvider.recordFormatter, schemaDeterminer, returnGenericAvroType = false) + val valueSchemaDeterminer = new SpecificRecordEmbeddedSchemaDeterminer(classTag[T].runtimeClass.asInstanceOf[Class[_ <: SpecificRecord]]) + val keySchemaDeterminer = FixedStringSchemaDeterminer + createSource( + preparedTopic, + kafkaConfig, + schemaRegistryProvider.deserializationSchemaFactory, + schemaRegistryProvider.recordFormatter, + keySchemaDeterminer, + valueSchemaDeterminer, + returnGenericAvroType = false + ) } } \ No newline at end of file diff --git a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSpec.scala b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSpec.scala index 39768b44704..53970aeabb2 100644 --- a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSpec.scala +++ b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSpec.scala @@ -83,7 +83,7 @@ class ConfluentKafkaAvroDeserializationSpec extends SchemaRegistryMixin with Tab pushMessage(FullNameV1.record, fullNameTopic, Some(fromRecordTopic.input)) pushMessage(FullNameV1.record, fullNameTopic, Some(fromSubjectVersionTopic.input)) - val fromRecordDeserializer = avroSetup.provider.deserializationSchemaFactory.create(None, kafkaConfig) + val fromRecordDeserializer = avroSetup.provider.deserializationSchemaFactory.create(kafkaConfig, None, None) consumeAndVerifyMessages(fromRecordDeserializer, fromRecordTopic.input, List(FullNameV1.record)) @@ -91,7 +91,7 @@ class ConfluentKafkaAvroDeserializationSpec extends SchemaRegistryMixin with Tab val subject = ConfluentUtils.topicSubject(fromSubjectVersionTopic.input, fromSubjectVersionTopic.isKey) val schemaId = schemaRegistryClient.getId(subject, ConfluentUtils.convertToAvroSchema(PaymentV1.schema)) val schemaData = RuntimeSchemaData(PaymentV1.schema, Some(schemaId)) - avroSetup.provider.deserializationSchemaFactory.create(Some(schemaData), kafkaConfig) + avroSetup.provider.deserializationSchemaFactory.create(kafkaConfig, None, Some(schemaData)) } assertThrows[SerializationException] { @@ -111,7 +111,7 @@ class ConfluentKafkaAvroDeserializationSpec extends SchemaRegistryMixin with Tab } else { None } - val deserializer = setup.provider.deserializationSchemaFactory.create(schemaDataOpt, kafkaConfig) + val deserializer = setup.provider.deserializationSchemaFactory.create(kafkaConfig, None, schemaDataOpt) setup.pushMessage(givenObj, topicConfig.input) diff --git a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory.scala b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory.scala index 1693ae8c838..2a30b99b39a 100644 --- a/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory.scala +++ b/engine/flink/avro-util/src/test/scala/pl/touk/nussknacker/engine/avro/source/TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory.scala @@ -10,19 +10,14 @@ import scala.reflect._ class TupleAvroKeyValueKafkaAvroDeserializerSchemaFactory[Key: ClassTag, Value: ClassTag](schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory) extends ConfluentKeyValueKafkaAvroDeserializationFactory(schemaRegistryClientFactory) { - override protected type K = Key - override protected type V = Value - override protected type O = (K, V) + override protected type O = (Key, Value) - override protected def keyClassTag: ClassTag[Key] = classTag[Key] - override protected def valueClassTag: ClassTag[Value] = classTag[Value] - override protected def objectClassTag: ClassTag[O] = classTag[(K, V)] - - override protected def createObject(key: Key, value: Value, topic: String): (Key, Value) = { - (key, value) + override protected def createObject[K: ClassTag, V: ClassTag](key: K, value: V, topic: String): (Key, Value) = { + (key.asInstanceOf[Key], value.asInstanceOf[Value]) } - override protected def createObjectTypeInformation(keyTypeInformation: TypeInformation[Key], valueTypeInformation: TypeInformation[Value]): TypeInformation[(Key, Value)] = + override protected def createObjectTypeInformation[K: ClassTag, V: ClassTag](keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O] = createTuple2TypeInformation(keyTypeInformation, valueTypeInformation) + .asInstanceOf[TypeInformation[O]] }