Skip to content

Commit

Permalink
Review fix - wip
Browse files Browse the repository at this point in the history
- created a function for repeated code
- removed use of avro in test for json
- used more constants were applicable
  • Loading branch information
Szymon Bogusz committed Nov 18, 2024
1 parent 4ef3b59 commit 65d38fb
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.Universa
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ExistingSchemaVersion,
LatestSchemaVersion,
PassedContentType,
SchemaRegistryClientFactory,
SchemaVersionOption
}
Expand Down Expand Up @@ -175,7 +174,6 @@ trait KafkaAvroSpecMixin
versionOption match {
case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'"
case ExistingSchemaVersion(version) => s"'$version'"
case PassedContentType(typ) => s"'$typ'"
}

protected def runAndVerifyResultSingleEvent(
Expand Down Expand Up @@ -314,7 +312,6 @@ trait KafkaAvroSpecMixin
versionOption match {
case LatestSchemaVersion => SchemaVersionOption.LatestOptionName
case ExistingSchemaVersion(v) => v.toString
case PassedContentType(typ) => typ.toString
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.MockSche
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ExistingSchemaVersion,
LatestSchemaVersion,
PassedContentType,
SchemaRegistryClientFactory,
SchemaVersionOption
}
Expand Down Expand Up @@ -179,7 +178,6 @@ abstract class FlinkWithKafkaSuite
versionOption match {
case LatestSchemaVersion => s"'${SchemaVersionOption.LatestOptionName}'"
case ExistingSchemaVersion(version) => s"'$version'"
case PassedContentType(typ) => s"'$typ'"
}

protected def createAndRegisterAvroTopicConfig(name: String, schemas: List[Schema]): TopicConfig =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
package pl.touk.nussknacker.defaultmodel

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import pl.touk.nussknacker.defaultmodel.SampleSchemas.RecordSchemaV2
import io.circe.{Json, parser}
import org.apache.kafka.shaded.com.google.protobuf.ByteString
import pl.touk.nussknacker.engine.api.process.TopicName.ForSource
import pl.touk.nussknacker.engine.api.validation.ValidationMode
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.kafka.KafkaTestUtils.richConsumer
import pl.touk.nussknacker.engine.schemedkafka.{KafkaUniversalComponentTransformer, RuntimeSchemaData}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{ContentTypes, SchemaId, SchemaWithMetadata}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaSupportDispatcher
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes
import pl.touk.nussknacker.engine.spel.SpelExtension.SpelExpresion
import pl.touk.nussknacker.test.PatientScalaFutures

import java.nio.ByteBuffer
import java.math.BigInteger
import java.nio.charset.StandardCharsets
import java.time.Instant
import java.util

class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with LazyLogging {
class KafkaJsonItSpec extends FlinkWithKafkaSuite {

private val givenMatchingAvroObjV2 = avroEncoder.encodeRecordOrError(
Map("first" -> "Jan", "middle" -> "Tomek", "last" -> "Kowalski"),
RecordSchemaV2
private val jsonRecord = Json.obj(
"first" -> Json.fromString("Jan"),
"middle" -> Json.fromString("Tomek"),
"last" -> Json.fromString("Kowalski")
)

test("should read json message from kafka without provided schema") {
val inputTopic = "input-topic-without-schema"
val outputTopic = "output-topic-without-schema"
test("should round-trip json message without provided schema") {

val inputTopic = "input-topic-without-schema-json"
val outputTopic = "output-topic-without-schema-json"

kafkaClient.createTopic(inputTopic, 1)
kafkaClient.createTopic(outputTopic, 1)
sendAsJson(givenMatchingAvroObjV2.toString, ForSource(inputTopic), Instant.now.toEpochMilli)
sendAsJson(jsonRecord.toString, ForSource(inputTopic), Instant.now.toEpochMilli)

val process =
ScenarioBuilder
Expand All @@ -51,32 +48,68 @@ class KafkaJsonItSpec extends FlinkWithKafkaSuite with PatientScalaFutures with
KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel,
KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel,
KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'${outputTopic}'".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'$outputTopic'".spel,
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.JSON.toString}'".spel,
KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel
)

run(process) {
val processed = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head

val schema = SchemaWithMetadata(
OpenAPIJsonSchema("""{"type": "object"}"""),
SchemaId.fromString(ContentTypes.JSON.toString)
)
val runtimeSchema = new RuntimeSchemaData(new NkSerializableParsedSchema[ParsedSchema](schema.schema), None)
val response =
UniversalSchemaSupportDispatcher(kafkaConfig)
.forSchemaType("JSON")
.payloadDeserializer
.deserialize(
Some(runtimeSchema),
runtimeSchema,
ByteBuffer.wrap(processed.value())
)
.asInstanceOf[util.HashMap[String, String]]

response.forEach((key, value) => givenMatchingAvroObjV2.get(key) shouldBe value)
val outputRecord = kafkaClient.createConsumer().consumeWithConsumerRecord(outputTopic).take(1).head
val parsedOutput = parser
.parse(new String(outputRecord.value(), StandardCharsets.UTF_8))
.fold(throw _, identity)

parsedOutput shouldBe jsonRecord
}
}

test("should round-trip plain message without provided schema") {
val inputTopic = "input-topic-without-schema-plain"
val outputTopic = "output-topic-without-schema-plain"

kafkaClient.createTopic(inputTopic, 1)
kafkaClient.createTopic(outputTopic, 1)
val shortJsonInHex = "7b2261223a20357d"
val longJsonInHex =
"227b226669727374223a2022546f6d656b222c20226d6964646c65223a20224a616e222c20226c617374223a20224b6f77616c736b69227d22"
val byteString = ByteString.fromHex(shortJsonInHex).toByteArray
val big = new BigInteger(shortJsonInHex, 16).toByteArray

val str = new String(byteString)
println(str)
println(byteString.mkString("Array(", ", ", ")"))
println(big.mkString("Array(", ", ", ")"))

kafkaClient.sendRawMessage(inputTopic, Array.empty, byteString, timestamp = Instant.now.toEpochMilli)
val process =
ScenarioBuilder
.streaming("without-schema")
.parallelism(1)
.source(
"start",
"kafka",
KafkaUniversalComponentTransformer.topicParamName.value -> Expression.spel(s"'$inputTopic'"),
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.PLAIN.toString}'".spel
)
.emptySink(
"end",
"kafka",
KafkaUniversalComponentTransformer.sinkKeyParamName.value -> "".spel,
KafkaUniversalComponentTransformer.sinkRawEditorParamName.value -> "true".spel,
KafkaUniversalComponentTransformer.sinkValueParamName.value -> "#input".spel,
KafkaUniversalComponentTransformer.topicParamName.value -> s"'$outputTopic'".spel,
KafkaUniversalComponentTransformer.contentTypeParamName.value -> s"'${ContentTypes.PLAIN.toString}'".spel,
KafkaUniversalComponentTransformer.sinkValidationModeParamName.value -> s"'${ValidationMode.lax.name}'".spel
)

run(process) {
val outputRecord = kafkaClient
.createConsumer()
.consumeWithConsumerRecord(outputTopic)
.take(1)
.head

outputRecord.value() shouldBe byteString
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
protected def getVersionOrContentTypeParam(
preparedTopic: PreparedKafkaTopic[TN],
)(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
val topicsWithSchema = topicSelectionStrategy.getTopics(schemaRegistryClient)
if (topicsWithSchema.exists(_.contains(preparedTopic.prepared.topicName.toUnspecialized))) {
if (schemaRegistryClient.isTopicWithSchema(
preparedTopic.prepared.topicName.toUnspecialized.name,
topicSelectionStrategy
)) {
val versions = schemaRegistryClient.getAllVersions(preparedTopic.prepared.toUnspecialized, isKey = false)
(versions match {
case Valid(versions) => Writer[List[ProcessCompilationError], List[Integer]](Nil, versions)
Expand All @@ -130,8 +132,8 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
}).map(getVersionParam)
} else {
val contentTypesValues = List(
FixedExpressionValue("'JSON'", "JSON"),
FixedExpressionValue("'PLAIN'", "PLAIN")
FixedExpressionValue(s"'${ContentTypes.JSON}'", s"${ContentTypes.JSON}"),
FixedExpressionValue(s"'${ContentTypes.PLAIN}'", s"${ContentTypes.PLAIN}")
)

Writer[List[ProcessCompilationError], List[FixedExpressionValue]](Nil, contentTypesValues).map(contentTypes =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema
import pl.touk.nussknacker.engine.schemedkafka.{AvroSchemaDeterminer, RuntimeSchemaData, SchemaDeterminerError}

class BasedOnVersionAvroSchemaDeterminer(
Expand All @@ -20,7 +18,7 @@ class BasedOnVersionAvroSchemaDeterminer(
override def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData[AvroSchema]] = {
val version = versionOption match {
case ExistingSchemaVersion(v) => Some(v)
case _ => None
case LatestSchemaVersion => None
}
schemaRegistryClient
.getFreshSchema(topic, version, isKey = isKey)
Expand Down Expand Up @@ -51,22 +49,10 @@ class ParsedSchemaDeterminer(
) {

def determineSchemaUsedInTyping: Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = {
versionOption match {
case ExistingSchemaVersion(v) =>
val version = Some(v)
getTypedSchema(version)
case LatestSchemaVersion =>
val version = None
getTypedSchema(version)
case PassedContentType(typ) =>
getEmptyJsonSchema(typ)
val version = versionOption match {
case ExistingSchemaVersion(v) => Some(v)
case LatestSchemaVersion => None
}

}

private def getTypedSchema(
version: Option[Int]
): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = {
schemaRegistryClient
.getFreshSchema(topic, version, isKey = isKey)
.leftMap(err =>
Expand All @@ -77,31 +63,4 @@ class ParsedSchemaDeterminer(
)
}

private def getEmptyJsonSchema(
typ: ContentType
): Validated[SchemaDeterminerError, RuntimeSchemaData[ParsedSchema]] = {
typ match {
case ContentTypes.JSON =>
Valid(
RuntimeSchemaData[ParsedSchema](
new NkSerializableParsedSchema[ParsedSchema](
// Input type in ad hoc or in sink for example is displayed based on this schema, empty makes it Unknown
OpenAPIJsonSchema(
"{}"
)
),
Some(SchemaId.fromString(ContentTypes.JSON.toString))
)
)
case ContentTypes.PLAIN =>
Valid(
RuntimeSchemaData[ParsedSchema](
new NkSerializableParsedSchema[ParsedSchema](OpenAPIJsonSchema("")),
Some(SchemaId.fromString(ContentTypes.PLAIN.toString))
)
)
case _ => Invalid(new SchemaDeterminerError("Wrong dynamic type", SchemaError.apply("Wrong dynamic type")))
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry

import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.OpenAPIJsonSchema

object ContentTypes extends Enumeration {
type ContentType = Value

val JSON, PLAIN = Value
}

object ContentTypesSchemas {
val schemaForJson: OpenAPIJsonSchema = OpenAPIJsonSchema("{}")
val schemaForPlain: OpenAPIJsonSchema = OpenAPIJsonSchema("")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry
import cats.data.Validated
import io.confluent.kafka.schemaregistry.ParsedSchema
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
import pl.touk.nussknacker.engine.schemedkafka.TopicSelectionStrategy

trait SchemaRegistryClient extends Serializable {

Expand Down Expand Up @@ -39,6 +40,11 @@ trait SchemaRegistryClient extends Serializable {

def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]]

def isTopicWithSchema(topic: String, strategy: TopicSelectionStrategy): Boolean = {
val topicsWithSchema = strategy.getTopics(this)
topicsWithSchema.exists(_.map(_.name).contains(topic))
}

}

// This trait is mainly for testing mechanism purpose - in production implementation we assume that all schemas
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry

import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.ContentTypes.ContentType
import pl.touk.nussknacker.engine.util.convert.IntValue

sealed trait SchemaVersionOption

object SchemaVersionOption {

val LatestOptionName = "latest"
val JsonOptionName = "Json"
val PlainOptionName = "Plain"

def byName(name: String): SchemaVersionOption = {
name match {
case `LatestOptionName` => LatestSchemaVersion
case IntValue(version) => ExistingSchemaVersion(version)
case `JsonOptionName` => PassedContentType(ContentTypes.JSON)
case `PlainOptionName` => PassedContentType(ContentTypes.PLAIN)
case _ => throw new IllegalArgumentException(s"Unexpected schema version option: $name")
}
}
Expand All @@ -26,11 +21,3 @@ object SchemaVersionOption {
case class ExistingSchemaVersion(version: Int) extends SchemaVersionOption

case object LatestSchemaVersion extends SchemaVersionOption

case class PassedContentType(typ: ContentType) extends SchemaVersionOption

object ContentTypes extends Enumeration {
type ContentType = Value

val JSON, PLAIN = Value
}
Loading

0 comments on commit 65d38fb

Please sign in to comment.