Skip to content

Commit

Permalink
Add AvroSchemaDeterminer with fallback to fixed String schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz committed Apr 13, 2021
1 parent ee9aea7 commit 2c87706
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object RuntimeSchemaData {

class SchemaDeterminerError(message: String, cause: Throwable) extends RuntimeException(message, cause)

case object FixedNoneSchemaDeterminer extends AvroSchemaDeterminer {
override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = Valid(RuntimeSchemaData(Schema.create(Schema.Type.NULL), None))
case object FixedStringSchemaDeterminer extends AvroSchemaDeterminer {
override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = Valid(RuntimeSchemaData(Schema.create(Schema.Type.STRING), None))
override def toRuntimeSchema(schemaUsedInTyping: RuntimeSchemaData): Option[RuntimeSchemaData] = None
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package pl.touk.nussknacker.engine.avro

import cats.data.Validated.{Invalid, Valid}
import cats.data.Writer
import org.apache.avro.Schema
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.{CustomNodeError, NodeId}
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue, SingleInputGenericNodeTransformation, TypedNodeDependencyValue}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, Parameter}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.TopicParamName
import pl.touk.nussknacker.engine.avro.schemaregistry.{BasedOnVersionAvroSchemaDeterminer, LatestSchemaVersion, SchemaRegistryClient, SchemaRegistryProvider, SchemaVersionOption}
import pl.touk.nussknacker.engine.avro.schemaregistry._
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, PreparedKafkaTopic}

import scala.reflect.ClassTag
Expand Down Expand Up @@ -90,7 +91,8 @@ trait KafkaAvroBaseTransformer[T] extends SingleInputGenericNodeTransformation[T

//TODO: add schema versioning for key schemas
protected def prepareKeySchemaDeterminer(preparedTopic: PreparedKafkaTopic): AvroSchemaDeterminer = {
new BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, LatestSchemaVersion, isKey = true)
val fallbackSchema = Schema.create(Schema.Type.STRING)
new BasedOnVersionWithFallbackAvroSchemaDeterminer(schemaRegistryClient, preparedTopic.prepared, LatestSchemaVersion, isKey = true, fallbackSchema)
}

protected def topicParamStep(implicit nodeId: NodeId): NodeTransformationDefinition = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package pl.touk.nussknacker.engine.avro.schemaregistry

import cats.data.Validated
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, SchemaDeterminerError, RuntimeSchemaData}
import cats.data.Validated.Valid
import org.apache.avro.Schema
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError}

class BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryClient,
topic: String,
Expand All @@ -20,3 +22,23 @@ class BasedOnVersionAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryCli
}

}

class BasedOnVersionWithFallbackAvroSchemaDeterminer(schemaRegistryClient: SchemaRegistryClient,
topic: String,
versionOption: SchemaVersionOption,
isKey: Boolean,
schema: Schema
) extends AvroSchemaDeterminer {

override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData] = {
val version = versionOption match {
case ExistingSchemaVersion(v) => Some(v)
case LatestSchemaVersion => None
}
schemaRegistryClient
.getFreshSchema(topic, version, isKey = isKey)
.map(withMetadata => RuntimeSchemaData(withMetadata.schema, Some(withMetadata.id)))
.orElse(Valid(RuntimeSchemaData(schema, None)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,6 @@ trait KafkaAvroDeserializationSchemaFactory extends Serializable {

}

object KafkaAvroKeyValueDeserializationSchemaFactory {

val fallbackKeyAsStringDeserializer: Deserializer[String] = new Deserializer[String] {
override def deserialize(topic: String, data: Array[Byte]): String = {
Option(data).map(bytes => new String(bytes, StandardCharsets.UTF_8)).orNull
}
}

val fallbackKeyAsStringTypeInformation: TypeInformation[String] = TypeInformation.of(classOf[String])
}

/**
* Abstract base implementation of [[KafkaAvroDeserializationSchemaFactory]]
* which uses Kafka's Deserializer in returned Flink's KeyedDeserializationSchema. It deserializes only value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.typed.{ReturningType, typing}
import pl.touk.nussknacker.engine.avro.serialization.KafkaAvroDeserializationSchemaFactory
import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, FixedNoneSchemaDeterminer, SchemaDeterminerErrorHandler}
import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, SchemaDeterminerErrorHandler}
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{LegacyTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.timestamp.BoundedOutOfOrderPreviousElementAssigner
Expand All @@ -31,9 +31,7 @@ abstract class BaseKafkaAvroSourceFactory[T: ClassTag](timestampAssigner: Option
nodeId: NodeId): KafkaSource[T] = {

// key schema
// TODO: add better support for no-key events with key-value deserialization
val keySchemaData = keySchemaDeterminer.determineSchemaUsedInTyping
.getOrElse(FixedNoneSchemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException))
val keySchemaData = keySchemaDeterminer.determineSchemaUsedInTyping.valueOr(SchemaDeterminerErrorHandler.handleSchemaRegistryErrorAndThrowException)
val keySchemaDataUsedInRuntime = keySchemaDeterminer.toRuntimeSchema(keySchemaData)

// value schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.NodeId
import pl.touk.nussknacker.engine.api.editor.{SimpleEditor, SimpleEditorType}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.api.{MetaData, MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.avro.FixedNoneSchemaDeterminer
import pl.touk.nussknacker.engine.avro.FixedStringSchemaDeterminer
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.TopicParamName
import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaRegistryProvider, SpecificRecordEmbeddedSchemaDeterminer}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler
Expand All @@ -29,7 +29,7 @@ class SpecificRecordKafkaAvroSourceFactory[T <: SpecificRecord: ClassTag](schema
val kafkaConfig = KafkaConfig.parseProcessObjectDependencies(processObjectDependencies)
val preparedTopic = KafkaUtils.prepareKafkaTopic(topic, processObjectDependencies)
val valueSchemaDeterminer = new SpecificRecordEmbeddedSchemaDeterminer(classTag[T].runtimeClass.asInstanceOf[Class[_ <: SpecificRecord]])
val keySchemaDeterminer = FixedNoneSchemaDeterminer
val keySchemaDeterminer = FixedStringSchemaDeterminer
createSource(
preparedTopic,
kafkaConfig,
Expand Down

0 comments on commit 2c87706

Please sign in to comment.