From 65d38fb1f1376a7be84947555744a81d2a330246 Mon Sep 17 00:00:00 2001 From: Szymon Bogusz Date: Mon, 18 Nov 2024 11:37:18 +0100 Subject: [PATCH] Review fix - wip - created a function for repeated code - removed use of avro in test for json - used more constants were applicable --- .../helpers/KafkaAvroSpecMixin.scala | 3 - .../defaultmodel/FlinkWithKafkaSuite.scala | 2 - .../defaultmodel/KafkaJsonItSpec.scala | 111 ++++++++++++------ .../KafkaUniversalComponentTransformer.scala | 10 +- .../BasedOnVersionAvroSchemaDeterminer.scala | 49 +------- .../schemaregistry/ContentTypes.scala | 14 +++ .../schemaregistry/SchemaRegistryClient.scala | 6 + .../schemaregistry/SchemaVersionOption.scala | 13 -- .../AbstractSchemaBasedRecordFormatter.scala | 33 ++++-- .../universal/ParsedSchemaSupport.scala | 6 +- .../universal/RecordFormatterSupport.scala | 11 +- .../UniversalKafkaDeserializer.scala | 26 ++-- .../UniversalSchemaPayloadDeserializer.scala | 12 +- .../sink/UniversalKafkaSinkFactory.scala | 16 ++- .../source/UniversalKafkaSourceFactory.scala | 11 +- 15 files changed, 179 insertions(+), 144 deletions(-) create mode 100644 utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala diff --git a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala index 9da9ccf9398..31bf3049c0d 100644 --- a/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala +++ b/engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/helpers/KafkaAvroSpecMixin.scala @@ -39,7 +39,6 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.Universa import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ ExistingSchemaVersion, LatestSchemaVersion, - PassedContentType, SchemaRegistryClientFactory, SchemaVersionOption } @@ -175,7 +174,6 @@ trait KafkaAvroSpecMixin versionOption match { case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'" case ExistingSchemaVersion(version) => s"'$version'" - case PassedContentType(typ) => s"'$typ'" } protected def runAndVerifyResultSingleEvent( @@ -314,7 +312,6 @@ trait KafkaAvroSpecMixin versionOption match { case LatestSchemaVersion => SchemaVersionOption.LatestOptionName case ExistingSchemaVersion(v) => v.toString - case PassedContentType(typ) => typ.toString } } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala index b72326ed841..248947348fe 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/FlinkWithKafkaSuite.scala @@ -41,7 +41,6 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSche import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ ExistingSchemaVersion, LatestSchemaVersion, - PassedContentType, SchemaRegistryClientFactory, SchemaVersionOption } @@ -179,7 +178,6 @@ abstract class FlinkWithKafkaSuite versionOption match { case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'" case ExistingSchemaVersion(version) => s"'$version'" - case PassedContentType(typ) => s"'$typ'" } protected def createAndRegisterAvroTopicConfig(name: String, schemas: List[Schema]): TopicConfig = diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala index 7ed18f6bb74..0ac8c3c5129 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/KafkaJsonItSpec.scala @@ -1,39 +1,36 @@ package pl.touk.nussknacker.defaultmodel -import com.typesafe.scalalogging.LazyLogging -import io.confluent.kafka.schemaregistry.ParsedSchema -import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema -import pl.touk.nussknacker.defaultmodel.SampleSchemas.RecordSchemaV2 +import io.circe.{Json, parser} +import org.apache.kafka.shaded.com.google.protobuf.ByteString import pl.touk.nussknacker.engine.api.process.TopicName.ForSource import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.build.ScenarioBuilder import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer -import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData} -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ContentTypes, SchemaId, SchemaWithMetadata} -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher +import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion -import pl.touk.nussknacker.test.PatientScalaFutures -import java.nio.ByteBuffer +import java.math.BigInteger +import java.nio.charset.StandardCharsets import java.time.Instant -import java.util -class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with LazyLogging { +class KafkaJsonItSpec extends FlinkWithKafkaSuite { - private val givenMatchingAvroObjV2 = avroEncoder.encodeRecordOrError( - Map("first" -> "Jan", "middle" -> "Tomek", "last" -> "Kowalski"), - RecordSchemaV2 + private val jsonRecord = Json.obj( + "first" -> Json.fromString("Jan"), + "middle" -> Json.fromString("Tomek"), + "last" -> Json.fromString("Kowalski") ) - test("should read json message from kafka without provided schema") { - val inputTopic = "input-topic-without-schema" - val outputTopic = "output-topic-without-schema" + test("should round-trip json message without provided schema") { + + val inputTopic = "input-topic-without-schema-json" + val outputTopic = "output-topic-without-schema-json" kafkaClient.createTopic(inputTopic, 1) kafkaClient.createTopic(outputTopic, 1) - sendAsJson(givenMatchingAvroObjV2.toString, ForSource(inputTopic), Instant.now.toEpochMilli) + sendAsJson(jsonRecord.toString, ForSource(inputTopic), Instant.now.toEpochMilli) val process = ScenarioBuilder @@ -51,32 +48,68 @@ class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel, KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel, KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel, - KafkaUniversalComponentTransformer.topicParamName.value -> s"'${outputTopic}'".spel, + KafkaUniversalComponentTransformer.topicParamName.value -> s"'$outputTopic'".spel, KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel, KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel ) run(process) { - val processed = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head - - val schema = SchemaWithMetadata( - OpenAPIJsonSchema("""{"type": "object"}"""), - SchemaId.fromString(ContentTypes.JSON.toString) - ) - val runtimeSchema = new RuntimeSchemaData(new NkSerializableParsedSchema[ParsedSchema](schema.schema), None) - val response = - UniversalSchemaSupportDispatcher(kafkaConfig) - .forSchemaType("JSON") - .payloadDeserializer - .deserialize( - Some(runtimeSchema), - runtimeSchema, - ByteBuffer.wrap(processed.value()) - ) - .asInstanceOf[util.HashMap[String, String]] - - response.forEach((key, value) => givenMatchingAvroObjV2.get(key) shouldBe value) + val outputRecord = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head + val parsedOutput = parser + .parse(new String(outputRecord.value(), StandardCharsets.UTF_8)) + .fold(throw _, identity) + + parsedOutput shouldBe jsonRecord + } + } + + test("should round-trip plain message without provided schema") { + val inputTopic = "input-topic-without-schema-plain" + val outputTopic = "output-topic-without-schema-plain" + + kafkaClient.createTopic(inputTopic, 1) + kafkaClient.createTopic(outputTopic, 1) + val shortJsonInHex = "7b2261223a20357d" + val longJsonInHex = + "227b226669727374223a2022546f6d656b222c20226d6964646c65223a20224a616e222c20226c617374223a20224b6f77616c736b69227d22" + val byteString = ByteString.fromHex(shortJsonInHex).toByteArray + val big = new BigInteger(shortJsonInHex, 16).toByteArray + + val str = new String(byteString) + println(str) + println(byteString.mkString("Array(", ", ", ")")) + println(big.mkString("Array(", ", ", ")")) + + kafkaClient.sendRawMessage(inputTopic, Array.empty, byteString, timestamp = Instant.now.toEpochMilli) + val process = + ScenarioBuilder + .streaming("without-schema") + .parallelism(1) + .source( + "start", + "kafka", + KafkaUniversalComponentTransformer.topicParamName.value -> Expression.spel(s"'$inputTopic'"), + KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.PLAIN.toString}'".spel + ) + .emptySink( + "end", + "kafka", + KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel, + KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel, + KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel, + KafkaUniversalComponentTransformer.topicParamName.value -> s"'$outputTopic'".spel, + KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.PLAIN.toString}'".spel, + KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel + ) + + run(process) { + val outputRecord = kafkaClient + .createConsumer() + .consumeWithConsumerRecord(outputTopic) + .take(1) + .head + outputRecord.value() shouldBe byteString } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala index e184235cfab..22dccf18bb8 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala @@ -117,8 +117,10 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid protected def getVersionOrContentTypeParam( preparedTopic: PreparedKafkaTopic[TN], )(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = { - val topicsWithSchema = topicSelectionStrategy.getTopics(schemaRegistryClient) - if (topicsWithSchema.exists(_.contains(preparedTopic.prepared.topicName.toUnspecialized))) { + if (schemaRegistryClient.isTopicWithSchema( + preparedTopic.prepared.topicName.toUnspecialized.name, + topicSelectionStrategy + )) { val versions = schemaRegistryClient.getAllVersions(preparedTopic.prepared.toUnspecialized, isKey = false) (versions match { case Valid(versions) => Writer[List[ProcessCompilationError], List[Integer]](Nil, versions) @@ -130,8 +132,8 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid }).map(getVersionParam) } else { val contentTypesValues = List( - FixedExpressionValue("'JSON'", "JSON"), - FixedExpressionValue("'PLAIN'", "PLAIN") + FixedExpressionValue(s"'${ContentTypes.JSON}'", s"${ContentTypes.JSON}"), + FixedExpressionValue(s"'${ContentTypes.PLAIN}'", s"${ContentTypes.PLAIN}") ) Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, contentTypesValues).map(contentTypes => diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala index fa685f41563..df4bfb67859 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/BasedOnVersionAvroSchemaDeterminer.scala @@ -6,8 +6,6 @@ import io.confluent.kafka.schemaregistry.ParsedSchema import io.confluent.kafka.schemaregistry.avro.AvroSchema import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema import pl.touk.nussknacker.engine.schemedkafka.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError} class BasedOnVersionAvroSchemaDeterminer( @@ -20,7 +18,7 @@ class BasedOnVersionAvroSchemaDeterminer( override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData[AvroSchema]] = { val version = versionOption match { case ExistingSchemaVersion(v) => Some(v) - case _ => None + case LatestSchemaVersion => None } schemaRegistryClient .getFreshSchema(topic, version, isKey = isKey) @@ -51,22 +49,10 @@ class ParsedSchemaDeterminer( ) { def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = { - versionOption match { - case ExistingSchemaVersion(v) => - val version = Some(v) - getTypedSchema(version) - case LatestSchemaVersion => - val version = None - getTypedSchema(version) - case PassedContentType(typ) => - getEmptyJsonSchema(typ) + val version = versionOption match { + case ExistingSchemaVersion(v) => Some(v) + case LatestSchemaVersion => None } - - } - - private def getTypedSchema( - version: Option[Int] - ): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = { schemaRegistryClient .getFreshSchema(topic, version, isKey = isKey) .leftMap(err => @@ -77,31 +63,4 @@ class ParsedSchemaDeterminer( ) } - private def getEmptyJsonSchema( - typ: ContentType - ): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = { - typ match { - case ContentTypes.JSON => - Valid( - RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema]( -// Input type in ad hoc or in sink for example is displayed based on this schema, empty makes it Unknown - OpenAPIJsonSchema( - "{}" - ) - ), - Some(SchemaId.fromString(ContentTypes.JSON.toString)) - ) - ) - case ContentTypes.PLAIN => - Valid( - RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")), - Some(SchemaId.fromString(ContentTypes.PLAIN.toString)) - ) - ) - case _ => Invalid(new SchemaDeterminerError("Wrong dynamic type", SchemaError.apply("Wrong dynamic type"))) - } - } - } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala new file mode 100644 index 00000000000..bb26fe080bb --- /dev/null +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/ContentTypes.scala @@ -0,0 +1,14 @@ +package pl.touk.nussknacker.engine.schemedkafka.schemaregistry + +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema + +object ContentTypes extends Enumeration { + type ContentType = Value + + val JSON, PLAIN = Value +} + +object ContentTypesSchemas { + val schemaForJson: OpenAPIJsonSchema = OpenAPIJsonSchema("{}") + val schemaForPlain: OpenAPIJsonSchema = OpenAPIJsonSchema("") +} diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala index 5f559b2b07e..6068b77fcd6 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry import cats.data.Validated import io.confluent.kafka.schemaregistry.ParsedSchema import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName +import pl.touk.nussknacker.engine.schemedkafka.TopicSelectionStrategy trait SchemaRegistryClient extends Serializable { @@ -39,6 +40,11 @@ trait SchemaRegistryClient extends Serializable { def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]] + def isTopicWithSchema(topic: String, strategy: TopicSelectionStrategy): Boolean = { + val topicsWithSchema = strategy.getTopics(this) + topicsWithSchema.exists(_.map(_.name).contains(topic)) + } + } // This trait is mainly for testing mechanism purpose - in production implementation we assume that all schemas diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala index 5f4165dca01..7d03cc90624 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaVersionOption.scala @@ -1,6 +1,5 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType import pl.touk.nussknacker.engine.util.convert.IntValue sealed trait SchemaVersionOption @@ -8,15 +7,11 @@ sealed trait SchemaVersionOption object SchemaVersionOption { val LatestOptionName = "latest" - val JsonOptionName = "Json" - val PlainOptionName = "Plain" def byName(name: String): SchemaVersionOption = { name match { case `LatestOptionName` => LatestSchemaVersion case IntValue(version) => ExistingSchemaVersion(version) - case `JsonOptionName` => PassedContentType(ContentTypes.JSON) - case `PlainOptionName` => PassedContentType(ContentTypes.PLAIN) case _ => throw new IllegalArgumentException(s"Unexpected schema version option: $name") } } @@ -26,11 +21,3 @@ object SchemaVersionOption { case class ExistingSchemaVersion(version: Int) extends SchemaVersionOption case object LatestSchemaVersion extends SchemaVersionOption - -case class PassedContentType(typ: ContentType) extends SchemaVersionOption - -object ContentTypes extends Enumeration { - type ContentType = Value - - val JSON, PLAIN = Value -} diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala index d63fbe4790f..abe88d41f5d 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala @@ -7,10 +7,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import pl.touk.nussknacker.engine.api.process.TopicName import pl.touk.nussknacker.engine.api.test.TestRecord import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord -import pl.touk.nussknacker.engine.kafka.{KafkaConfig, RecordFormatter, UnspecializedTopicName, serialization} -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema +import pl.touk.nussknacker.engine.kafka.{KafkaConfig, RecordFormatter, serialization} +import pl.touk.nussknacker.engine.schemedkafka.AllTopicsSelectionStrategy import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ ContentTypes, + ContentTypesSchemas, SchemaId, SchemaIdFromMessageExtractor, SchemaRegistryClient, @@ -111,19 +112,29 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte .getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key")) } - if (schemaRegistryClient.getAllTopics.exists(_.contains(UnspecializedTopicName(topic.name)))) { + if (schemaRegistryClient.isTopicWithSchema(topic.name, new AllTopicsSelectionStrategy)) { val valueSchemaOpt = record.valueSchemaId.map(schemaRegistryClient.getSchemaById).map(_.schema) val valueBytes = readValueMessage(valueSchemaOpt, topic, value) (keyBytes, valueBytes) } else { - val valueSchemaOpt = - Option( - SchemaWithMetadata( - OpenAPIJsonSchema("""{"type": "object"}"""), - SchemaId.fromString(ContentTypes.JSON.toString) - ).schema - ) - val valueBytes = readValueMessage(valueSchemaOpt, topic, value) + val schema = record.valueSchemaId.flatMap { + case StringSchemaId(contentType) => + if (contentType.equals(ContentTypes.JSON.toString)) { + Some( + SchemaWithMetadata( + ContentTypesSchemas.schemaForJson, + SchemaId.fromString(ContentTypes.JSON.toString) + ).schema + ) + } else if (contentType.equals(ContentTypes.PLAIN.toString)) { + None + } else + throw new IllegalStateException("Schemaless topic should have json or plain content type, got neither") + case _ => + throw new IllegalStateException("Schemaless topic should have json or plain content type, got neither") + + } + val valueBytes = readValueMessage(schema, topic, value) (keyBytes, valueBytes) } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala index 91c455b007d..22946763132 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala @@ -161,9 +161,11 @@ object JsonSchemaSupport extends ParsedSchemaSupport[OpenAPIJsonSchema] { (value: Any) => { // In ad-hoc test without schema we create object `{ "Value" = userInputInAdHoc }`, so if present we should just take the input Try { - val (key, values) = value.asInstanceOf[Map[String, Map[String, Any]]].head + val temp = value.asInstanceOf[Map[String, Map[String, Any]]].head + val key = temp._1 + // Any try to create a variable with value temp._2 fails if (key.equals("Value")) { - values + temp._2 } else Failure } match { // For normal usage diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala index 8589b63648b..d1bc6dcbeba 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/RecordFormatterSupport.scala @@ -5,7 +5,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema import io.confluent.kafka.schemaregistry.avro.AvroSchema import pl.touk.nussknacker.engine.api.process.TopicName import pl.touk.nussknacker.engine.kafka.KafkaConfig -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaRegistryClient +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ContentTypesSchemas, SchemaRegistryClient} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.{AvroMessageFormatter, AvroMessageReader} import pl.touk.nussknacker.engine.util.Implicits._ import pl.touk.nussknacker.engine.util.json.ToJsonEncoder @@ -50,8 +50,13 @@ object JsonPayloadRecordFormatterSupport extends RecordFormatterSupport { private def readMessage(topic: TopicName.ForSource, schemaOpt: Option[ParsedSchema], jsonObj: Json): Array[Byte] = jsonObj match { // we handle strings this way because we want to keep result value compact and JString is formatted in quotes - case j if j.isString => j.asString.get.getBytes(StandardCharsets.UTF_8) - case other => other.noSpaces.getBytes(StandardCharsets.UTF_8) + case j if j.isString => + schemaOpt match { + case None => j.asString.get.getBytes() + case Some(ContentTypesSchemas.schemaForJson) => j.asString.get.getBytes(StandardCharsets.UTF_8) + case _ => j.asString.get.getBytes(StandardCharsets.UTF_8) + } + case other => other.noSpaces.getBytes(StandardCharsets.UTF_8) } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala index 7e6fbda228f..438542acb7d 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala @@ -4,16 +4,17 @@ import io.confluent.kafka.schemaregistry.ParsedSchema import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema import org.apache.kafka.common.header.Headers import org.apache.kafka.common.serialization.Deserializer -import pl.touk.nussknacker.engine.kafka.{KafkaConfig, UnspecializedTopicName} -import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema +import pl.touk.nussknacker.engine.kafka.KafkaConfig +import pl.touk.nussknacker.engine.schemedkafka.{AllTopicsSelectionStrategy, RuntimeSchemaData} import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.SchemaRegistryBasedDeserializerFactory import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ ChainedSchemaIdFromMessageExtractor, ContentTypes, + ContentTypesSchemas, SchemaId, SchemaRegistryClient, - SchemaWithMetadata + SchemaWithMetadata, + StringSchemaId } import scala.reflect.ClassTag @@ -41,14 +42,19 @@ class UniversalKafkaDeserializer[T]( .getOrElse(throw MessageWithoutSchemaIdException) val schemaWithMetadata = { - if (schemaRegistryClient.getAllTopics.exists(_.contains(UnspecializedTopicName(topic)))) { + if (schemaRegistryClient.isTopicWithSchema(topic, new AllTopicsSelectionStrategy)) { schemaRegistryClient.getSchemaById(writerSchemaId.value) } else { - SchemaWithMetadata( - // I don't know how these schemas affect deserialization later - OpenAPIJsonSchema("""{"type": "object"}"""), - SchemaId.fromString(ContentTypes.JSON.toString) - ) + writerSchemaId.value match { + case StringSchemaId(value) => + if (value.equals(ContentTypes.PLAIN.toString)) { + SchemaWithMetadata(ContentTypesSchemas.schemaForPlain, SchemaId.fromString(ContentTypes.PLAIN.toString)) + } else { + SchemaWithMetadata(ContentTypesSchemas.schemaForJson, SchemaId.fromString(ContentTypes.JSON.toString)) + } + case _ => + throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither") + } } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala index 287c3ed70d0..aaa7e81c7fe 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaPayloadDeserializer.scala @@ -7,6 +7,7 @@ import org.apache.avro.io.DecoderFactory import pl.touk.nussknacker.engine.kafka.KafkaConfig import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData import pl.touk.nussknacker.engine.schemedkafka.schema.{AvroRecordDeserializer, DatumReaderWriterMixin} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypesSchemas import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.jsonpayload.JsonPayloadToAvroConverter import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.GenericRecordSchemaIdSerializationSupport @@ -75,10 +76,17 @@ object JsonSchemaPayloadDeserializer extends UniversalSchemaPayloadDeserializer buffer: ByteBuffer ): Any = { val jsonSchema = - expectedSchemaData.getOrElse(writerSchemaData).asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]].schema + expectedSchemaData + .getOrElse(writerSchemaData) + .asInstanceOf[RuntimeSchemaData[OpenAPIJsonSchema]] + .schema val bytes = new Array[Byte](buffer.remaining()) buffer.get(bytes) - jsonSchema.deserializer.deserialize(bytes) + if (jsonSchema.equals(ContentTypesSchemas.schemaForPlain)) { + bytes + } else { + jsonSchema.deserializer.deserialize(bytes) + } } } diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala index 1b0e721eb74..cfcb6745a7d 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala @@ -19,8 +19,12 @@ import pl.touk.nussknacker.engine.api.validation.ValidationMode import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData, NodeId, Params} import pl.touk.nussknacker.engine.graph.expression.Expression import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._ -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ + ContentTypes, + ContentTypesSchemas, + SchemaBasedSerdeProvider, + SchemaRegistryClientFactory +} import pl.touk.nussknacker.engine.schemedkafka.sink.UniversalKafkaSinkFactory.TransformationState import pl.touk.nussknacker.engine.schemedkafka.{ KafkaUniversalComponentTransformer, @@ -74,7 +78,7 @@ class UniversalKafkaSinkFactory( ) private val jsonSchema = RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("{}")), + new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForJson), None ) @@ -164,7 +168,11 @@ class UniversalKafkaSinkFactory( (`sinkValueParamName`, value: BaseDefinedParameter) :: Nil, _ ) => - val runtimeSchemaData = jsonSchema + val runtimeSchemaData = if (contentType.equals(ContentTypes.JSON.toString)) { + jsonSchema + } else { + jsonSchema.copy(new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain)) + } schemaSupportDispatcher .forSchemaType(runtimeSchemaData.schema.schemaType()) .extractParameter( diff --git a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala index 96c5bd9c5e5..b8aa14138ca 100644 --- a/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala +++ b/utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/UniversalKafkaSourceFactory.scala @@ -26,12 +26,11 @@ import pl.touk.nussknacker.engine.api.test.TestRecord import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypingResult, Unknown} import pl.touk.nussknacker.engine.api.{MetaData, NodeId, Params} import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord -import pl.touk.nussknacker.engine.kafka.{PreparedKafkaTopic, UnspecializedTopicName} +import pl.touk.nussknacker.engine.kafka.PreparedKafkaTopic import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.{KafkaSourceImplFactory, KafkaTestParametersInfo} import pl.touk.nussknacker.engine.kafka.source._ import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.schemaVersionParamName import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._ -import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.SchemaBasedSerializableConsumerRecord import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupport import pl.touk.nussknacker.engine.schemedkafka.source.UniversalKafkaSourceFactory._ @@ -72,12 +71,12 @@ class UniversalKafkaSourceFactory( _ ) => val preparedTopic = prepareTopic(topic) - val valueValidationResult = if (contentType.equals("JSON")) { + val valueValidationResult = if (contentType.equals(ContentTypes.JSON.toString)) { Valid( ( Some( RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("{}")), + new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForJson), Some(SchemaId.fromString(ContentTypes.JSON.toString)) ) ), @@ -90,13 +89,13 @@ class UniversalKafkaSourceFactory( ( Some( RuntimeSchemaData[ParsedSchema]( - new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")), + new NkSerializableParsedSchema[ParsedSchema](ContentTypesSchemas.schemaForPlain), Some(SchemaId.fromString(ContentTypes.PLAIN.toString)) ) ), // This is the type after it leaves source // TODO: Should be Array[Byte] when handling is implemented - Unknown + Typed[Array[java.lang.Byte]] ) ) }