Skip to content

Commit

Permalink
ClassTag is provided in params in deserialization schema factory … (#…
Browse files Browse the repository at this point in the history
…1499)

* ClassTag is provided in params so one deserialization schema factory will work for multiple deserialization schemas

* Code review fixes

* Code review fixes - code formatting

* Migration guide.

* Add AvroSchemaDeterminer with fallback to fixed String schema.
  • Loading branch information
gskrobisz authored Apr 13, 2021
1 parent 4c9c897 commit b7c0424
Show file tree
Hide file tree
Showing 16 changed files with 177 additions and 102 deletions.
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Nussknacker versions
* [#1405](https://github.com/TouK/nussknacker/pull/1405) 'KafkaAvroSinkFactoryWithEditor' for more user-friendly Avro message definition.
* [#1510](https://github.com/TouK/nussknacker/pull/1510) `FlinkSource` API allows to create stream of `Context` (FlinkSource API and test support API refactoring).
* [#1497](https://github.com/TouK/nussknacker/pull/1497) Initial support for multiple (named) schedules in `PeriodicProcessManager`
* [#1499](https://github.com/TouK/nussknacker/pull/1499) ClassTag is provided in params in avro key-value deserialization schema factory: `KafkaAvroKeyValueDeserializationSchemaFactory`

0.3.1 (not released yet)
------------------------
Expand Down
4 changes: 4 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ To see biggest differences please consult the [changelog](Changelog.md).
- For kafka sources `RecordFormatter` parses raw test data to `ConsumerRecord` which fits into deserializer (instead of `ProducerRecord` that required another transformation).
- Definitions of names of common `Context` variables are moved to `VariableConstants` (instead of `Interpreter`).
* [#1497](https://github.com/TouK/nussknacker/pull/1497) Changes in `PeriodicProcessManager`, change `PeriodicProperty` to `ScheduleProperty`
* [#1499](https://github.com/TouK/nussknacker/pull/1499)
- trait `KafkaAvroDeserializationSchemaFactory` uses both key and value ClassTags and schemas (instead of value-only), check the order of parameters.
- ClassTag is provided in params in avro key-value deserialization schema factory: `KafkaAvroKeyValueDeserializationSchemaFactory`
- `BaseKafkaAvroSourceFactory` is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change)

## In version 0.3.0

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.avro

import cats.data.Validated
import cats.data.Validated.Valid
import org.apache.avro.Schema
import org.apache.flink.formats.avro.typeutils.NkSerializableAvroSchema

Expand Down Expand Up @@ -34,3 +35,8 @@ object RuntimeSchemaData {
}

class SchemaDeterminerError(message: String, cause: Throwable) extends RuntimeException(message, cause)

case object FixedStringSchemaDeterminer extends AvroSchemaDeterminer {
override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = Valid(RuntimeSchemaData(Schema.create(Schema.Type.STRING), None))
override def toRuntimeSchema(schemaUsedInTyping: RuntimeSchemaData): Option[RuntimeSchemaData] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package pl.touk.nussknacker.engine.avro

import cats.data.Validated.{Invalid, Valid}
import cats.data.Writer
import pl.touk.nussknacker.engine.api.MetaData
import org.apache.avro.Schema
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CustomNodeError, NodeId}
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputGenericNodeTransformation, TypedNodeDependencyValue}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, NodeDependency, Parameter, TypedNodeDependency}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, Parameter}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.TopicParamName
import pl.touk.nussknacker.engine.avro.schemaregistry.{BasedOnVersionAvroSchemaDeterminer, SchemaRegistryClient, SchemaRegistryProvider, SchemaVersionOption}
import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactoryWithEditor.paramsDeterminedAfterSchema
import pl.touk.nussknacker.engine.avro.schemaregistry._
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, PreparedKafkaTopic}

import scala.reflect.ClassTag
Expand Down Expand Up @@ -86,8 +85,14 @@ trait KafkaAvroBaseTransformer[T] extends SingleInputGenericNodeTransformation[T
protected def parseVersionOption(versionOptionName: String): SchemaVersionOption =
SchemaVersionOption.byName(versionOptionName)

protected def prepareSchemaDeterminer(preparedTopic: PreparedKafkaTopic, version: SchemaVersionOption): AvroSchemaDeterminer = {
new BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, version)
protected def prepareValueSchemaDeterminer(preparedTopic: PreparedKafkaTopic, version: SchemaVersionOption): AvroSchemaDeterminer = {
new BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, version, isKey = false)
}

//TODO: add schema versioning for key schemas
protected def prepareKeySchemaDeterminer(preparedTopic: PreparedKafkaTopic): AvroSchemaDeterminer = {
val fallbackSchema = Schema.create(Schema.Type.STRING)
new BasedOnVersionWithFallbackAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, LatestSchemaVersion, isKey = true, fallbackSchema)
}

protected def topicParamStep(implicit nodeId: NodeId): NodeTransformationDefinition = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,44 @@
package pl.touk.nussknacker.engine.avro.schemaregistry

import cats.data.Validated
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, SchemaDeterminerError, RuntimeSchemaData}
import cats.data.Validated.Valid
import org.apache.avro.Schema
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError}

class BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryClient,
topic: String,
versionOption: SchemaVersionOption) extends AvroSchemaDeterminer {
versionOption: SchemaVersionOption,
isKey: Boolean) extends AvroSchemaDeterminer {

override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = {
val version = versionOption match {
case ExistingSchemaVersion(v) => Some(v)
case LatestSchemaVersion => None
}
schemaRegistryClient
.getFreshSchema(topic, version, isKey = false)
.getFreshSchema(topic, version, isKey = isKey)
.leftMap(err => new SchemaDeterminerError(s"Fetching schema error for topic: $topic, version: $versionOption", err))
.map(withMetadata => RuntimeSchemaData(withMetadata.schema, Some(withMetadata.id)))
}

}

class BasedOnVersionWithFallbackAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryClient,
topic: String,
versionOption: SchemaVersionOption,
isKey: Boolean,
schema: Schema
) extends AvroSchemaDeterminer {

override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = {
val version = versionOption match {
case ExistingSchemaVersion(v) => Some(v)
case LatestSchemaVersion => None
}
schemaRegistryClient
.getFreshSchema(topic, version, isKey = isKey)
.map(withMetadata => RuntimeSchemaData(withMetadata.schema, Some(withMetadata.id)))
.orElse(Valid(RuntimeSchemaData(schema, None)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ object ConfluentUtils extends LazyLogging {

def parsePayloadToByteBuffer(payload: Array[Byte]): Validated[IllegalArgumentException, ByteBuffer] = {
val buffer = ByteBuffer.wrap(payload)
if (buffer.get != MagicByte)
if (buffer.array().isEmpty)
// Here parsed payload is an empty buffer. In that case buffer.get below raises "java.nio.BufferUnderflowException".
// This usually happens when the content of key or value is null.
Validated.invalid(new IllegalArgumentException("Buffer is empty"))
else if (buffer.get != MagicByte)
Validated.invalid(new IllegalArgumentException("Unknown magic byte!"))
else
Validated.valid(buffer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization

import com.typesafe.scalalogging.LazyLogging
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.typeutils.{LogicalTypesAvroTypeInfo, LogicalTypesGenericRecordAvroTypeInfo, LogicalTypesGenericRecordWithSchemaIdAvroTypeInfo}
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import pl.touk.nussknacker.engine.avro.kryo.KryoGenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.serialization.{KafkaAvroKeyValueDeserializationSchemaFactory, KafkaAvroValueDeserializationSchemaFactory}
Expand Down Expand Up @@ -51,16 +48,16 @@ class ConfluentKafkaAvroDeserializationSchemaFactory(schemaRegistryClientFactory
abstract class ConfluentKeyValueKafkaAvroDeserializationFactory(schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory)
extends KafkaAvroKeyValueDeserializationSchemaFactory with ConfluentKafkaAvroDeserializerFactory {

override protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[K] =
createDeserializer[K](schemaRegistryClientFactory, kafkaConfig, None, isKey = true)(keyClassTag)
override protected def createKeyDeserializer[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[K] =
createDeserializer[K](schemaRegistryClientFactory, kafkaConfig, schemaDataOpt, isKey = true)

override protected def createKeyTypeInfo(kafkaConfig: KafkaConfig): TypeInformation[K] =
createTypeInfo[K](kafkaConfig, None)(keyClassTag)
override protected def createKeyTypeInfo[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[K] =
createTypeInfo[K](kafkaConfig, schemaDataOpt)

override protected def createValueDeserializer(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V] =
createDeserializer[V](schemaRegistryClientFactory, kafkaConfig, schemaDataOpt, isKey = false)(valueClassTag)
override protected def createValueDeserializer[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V] =
createDeserializer[V](schemaRegistryClientFactory, kafkaConfig, schemaDataOpt, isKey = false)

override protected def createValueTypeInfo(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V] =
createTypeInfo[V](kafkaConfig, schemaDataOpt)(valueClassTag)
override protected def createValueTypeInfo[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V] =
createTypeInfo[V](kafkaConfig, schemaDataOpt)

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ class ConfluentKafkaAvroDeserializer[T](kafkaConfig: KafkaConfig, schemaData: Ru
KryoGenericRecordSchemaIdSerializationSupport.schemaIdSerializationEnabled(kafkaConfig)

override def deserialize(topic: String, data: Array[Byte]): T = {
val record = deserialize(topic, isKey, data, schemaData)
record.asInstanceOf[T]
val deserializedData = deserialize(topic, isKey, data, schemaData)
deserializedData.asInstanceOf[T]
}

override def close(): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package pl.touk.nussknacker.engine.avro.serialization

import java.nio.charset.StandardCharsets

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -16,14 +18,20 @@ import scala.reflect._
trait KafkaAvroDeserializationSchemaFactory extends Serializable {

/**
* Prepare Flink's KafkaDeserializationSchema based on provided information.
* @param schemaDataOpt Schema to which will be used as a reader schema. In case of None, will be used the same schema as writer schema.
* @param kafkaConfig Configuration of integration with Kafka
* @tparam T Type that should be produced by deserialization schema. It is important parameter, because factory can
* use other deserialization strategy base on it or provide different TypeInformation
* @return KafkaDeserializationSchema
*/
def create[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[T]
* Prepare Flink's KafkaDeserializationSchema based on provided information.
*
* @param kafkaConfig Configuration of integration with Kafka.
* @param keySchemaDataOpt Schema which will be used as a key reader schema.
* @param valueSchemaDataOpt Schema which will be used as a value reader schema. In case of None, writer schema will be used.
* @tparam K Type that should be produced by key deserialization schema.
* @tparam V Type that should be produced by value deserialization schema. It is important parameter, because factory can
* use other deserialization strategy base on it or provide different TypeInformation
* @return KafkaDeserializationSchema
*/
def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any]

}

Expand All @@ -38,20 +46,24 @@ abstract class KafkaAvroValueDeserializationSchemaFactory

protected def createValueTypeInfo[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[T]

override def create[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[T] = {
new KafkaDeserializationSchema[T] {
override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any] = {
new KafkaDeserializationSchema[V] {
@transient
private lazy val deserializer = createValueDeserializer[T](schemaDataOpt, kafkaConfig)
private lazy val deserializer = createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig)

override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): V = {
val value = deserializer.deserialize(consumerRecord.topic(), consumerRecord.headers(), consumerRecord.value())
value
}

override def isEndOfStream(nextElement: T): Boolean = false
override def isEndOfStream(nextElement: V): Boolean = false

override def getProducedType: TypeInformation[T] = createValueTypeInfo(schemaDataOpt, kafkaConfig)
override def getProducedType: TypeInformation[V] = createValueTypeInfo(valueSchemaDataOpt, kafkaConfig)
}
.asInstanceOf[KafkaDeserializationSchema[Any]]
}

}
Expand All @@ -64,53 +76,47 @@ abstract class KafkaAvroValueDeserializationSchemaFactory
abstract class KafkaAvroKeyValueDeserializationSchemaFactory
extends KafkaAvroDeserializationSchemaFactory {

protected type K

protected type V

protected type O

// TODO Make this provided in params so one deserialization schema factory will work for multiple deserialization schemas
protected def keyClassTag: ClassTag[K]
protected def createKeyDeserializer[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[K]

protected def valueClassTag: ClassTag[V]
protected def createKeyTypeInfo[K: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[K]

protected def objectClassTag: ClassTag[O]
protected def createValueDeserializer[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V]

// TODO We currently not support schema evolution for keys
protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[K]
protected def createValueTypeInfo[V: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V]

protected def createKeyTypeInfo(kafkaConfig: KafkaConfig): TypeInformation[K]
protected def createObject[K: ClassTag, V: ClassTag](key: K, value: V, topic: String): O

protected def createValueDeserializer(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): Deserializer[V]
protected def createObjectTypeInformation[K: ClassTag, V: ClassTag](keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O]

protected def createValueTypeInfo(schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[V]
override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any] = {

protected def createObject(key: K, value: V, topic: String): O

protected def createObjectTypeInformation(keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O]

override def create[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[T] = {
if (!classTag[T].runtimeClass.isAssignableFrom(objectClassTag.runtimeClass)) {
throw new IllegalArgumentException("Illegal input class: " + classTag[T].runtimeClass)
}
new KafkaDeserializationSchema[T] {
new KafkaDeserializationSchema[O] {
@transient
private lazy val keyDeserializer = createKeyDeserializer(kafkaConfig)
private lazy val keyDeserializer = createKeyDeserializer[K](keySchemaDataOpt, kafkaConfig)
@transient
private lazy val valueDeserializer = createValueDeserializer(schemaDataOpt, kafkaConfig)
private lazy val valueDeserializer = createValueDeserializer[V](valueSchemaDataOpt, kafkaConfig)

override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): T = {
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): O = {
val key = keyDeserializer.deserialize(consumerRecord.topic(), consumerRecord.key())
val value = valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value())
val obj = createObject(key, value, consumerRecord.topic())
obj.asInstanceOf[T]
val obj = createObject[K, V](key, value, consumerRecord.topic())
obj.asInstanceOf[O]
}

override def isEndOfStream(nextElement: T): Boolean = false
override def isEndOfStream(nextElement: O): Boolean = false

override def getProducedType: TypeInformation[T] = createObjectTypeInformation(createKeyTypeInfo(kafkaConfig), createValueTypeInfo(schemaDataOpt, kafkaConfig)).asInstanceOf[TypeInformation[T]]
override def getProducedType: TypeInformation[O] =
createObjectTypeInformation[K, V](
createKeyTypeInfo[K](keySchemaDataOpt, kafkaConfig),
createValueTypeInfo[V](valueSchemaDataOpt, kafkaConfig)
)
}
.asInstanceOf[KafkaDeserializationSchema[Any]]
}

}
Loading

0 comments on commit b7c0424

Please sign in to comment.