Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with generic/specific record serialization to Array[byte] #1840

Merged
merged 3 commits into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,32 @@ package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter
import io.circe.Json
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer
import org.apache.avro.Schema.Type
import org.apache.avro.io.DecoderFactory
import org.apache.avro.util.Utf8
import org.apache.avro.Schema
import org.apache.kafka.common.errors.SerializationException
import pl.touk.nussknacker.engine.avro.AvroUtils
import pl.touk.nussknacker.engine.avro.schema.DatumReaderWriterMixin
import pl.touk.nussknacker.engine.avro.schema.{DatumReaderWriterMixin, DefaultAvroSchemaEvolution}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.AbstractConfluentKafkaAvroSerializer

import scala.reflect.ClassTag

/**
* @param schemaRegistryClient schema registry client
*/
private[confluent] class ConfluentAvroMessageReader(schemaRegistryClient: SchemaRegistryClient)
extends AbstractKafkaAvroSerializer with DatumReaderWriterMixin {
extends AbstractConfluentKafkaAvroSerializer(new DefaultAvroSchemaEvolution) with DatumReaderWriterMixin {

schemaRegistry = schemaRegistryClient

private val decoderFactory = DecoderFactory.get

def readJson[T: ClassTag](jsonObj: Json, schema: Schema, subject: String): Array[Byte] = {
def readJson[T: ClassTag](jsonObj: Json, schemaId: Int, schema: Schema): Array[Byte] = {
try {
val avroObj = jsonToAvro[T](jsonObj, schema)
serializeImpl(subject, avroObj, new AvroSchema(schema))
val avroSchema = new AvroSchema(schema)
writeData(avroObj, avroSchema.rawSchema(), schemaId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just use schema here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

} catch {
case ex: Exception =>
throw new SerializationException("Error reading from input", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ class ConfluentAvroToJsonFormatter[K: ClassTag, V: ClassTag](kafkaConfig: KafkaC
}
} else {
val keySchema = record.keySchemaId.map(id => getSchemaById(id)).getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id"))
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, keySchema, ConfluentUtils.keySubject(topic))).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
keyOpt.map(keyJson => messageReader.readJson[K](keyJson, record.keySchemaId.get, keySchema)).getOrElse(throw new IllegalArgumentException("Error reading key schema: expected valid avro key"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's write val keySchemaId = record.keySchemaId.getOrElse(throw new IllegalArgumentException("Error reading key schema: empty schema id")) val keySchema = getSchemaById(keySchemaId) to avoid this nasty .get :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, nasty get is removed.

}
val valueSchema = getSchemaById(record.valueSchemaId)
val valueBytes = messageReader.readJson[V](value, valueSchema, ConfluentUtils.valueSubject(topic))
val valueBytes = messageReader.readJson[V](value, record.valueSchemaId, valueSchema)
(keyBytes, valueBytes)
}
record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic, Record

import scala.reflect.ClassTag

/**
* Base implementation of KafkaSource factory with Avro schema support. It is based on GenericNodeTransformation to
* - allow key and value type identification based on Schema Registry and
* - allow Context initialization with event's value, key and metadata
* You can provide schemas for both key and value. When useStringForKey = true (see KafkaConfig) the contents of event's key
* are treated as String (this is default scenario).
* Reader schema used in runtime is determined by topic and version.
* Reader schema can be different than schema used by writer (e.g. when writer produces event with new schema), in that case "schema evolution" may be required.
* For SpecificRecord use SpecificRecordKafkaAvroSourceFactory.
* Assumptions:
* 1. Every event that comes in has its key and value schemas registered in Schema Registry.
* 2. Avro payload must include schema id for both Generic and Specific records (to provide "schema evolution" we need to know the exact writers schema).
*
* @param schemaRegistryProvider - provides a set of strategies for serialization and deserialization while event processing and/or testing.
* @tparam K - type of event's key, used to determine if key object is Specific or Generic (for GenericRecords use Any)
* @tparam V - type of event's value, used to determine if value object is Specific or Generic (for GenericRecords use Any)
*/
class KafkaAvroSourceFactory[K:ClassTag, V:ClassTag](val schemaRegistryProvider: SchemaRegistryProvider,
val processObjectDependencies: ProcessObjectDependencies,
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[K, V]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResu
import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData, VariableConstants}
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.{SchemaVersionParamName, TopicParamName}
import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaVersionOption}
import pl.touk.nussknacker.engine.compile.ExpressionCompiler
Expand All @@ -24,6 +24,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator

import java.nio.charset.StandardCharsets
import java.time.{LocalDateTime, ZoneOffset}
import scala.collection.immutable.ListMap

class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvroSourceSpecMixin {
Expand Down Expand Up @@ -52,6 +53,20 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr
roundTripKeyValueObject(specificSourceFactory[FullNameV1], useStringForKey = true, RecordTopic, ExistingSchemaVersion(1), null, givenObj)
}

test("should read last generated generic record with logical types") {
val givenObj = PaymentDate.record

roundTripKeyValueObject(avroSourceFactory, useStringForKey = true, PaymentDateTopic, ExistingSchemaVersion(1), "", givenObj)
}

test("should read last generated specific record with logical types ") {
val date = LocalDateTime.of(2020, 1, 2, 3, 14, 15)
val decimal = new java.math.BigDecimal("12.34")
val givenObj = new GeneratedAvroClassWithLogicalTypes("loremipsum", date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, decimal)

roundTripKeyValueObject(specificSourceFactory[GeneratedAvroClassWithLogicalTypes], useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj)
}

test("should read generated record in v2") {
val givenObj = FullNameV2.createRecord("Jan", "Maria", "Kowalski")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.avro.{AvroUtils, TestSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.encode.{BestEffortAvroEncoder, ValidationMode}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypes, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder}
import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor

Expand All @@ -29,6 +29,8 @@ trait KafkaAvroSourceSpecMixin {
val IntTopicWithKey: String = "testAvroIntTopic1WithKey"
val IntTopicNoKey: String = "testAvroIntTopic1NoKey"
val InvalidDefaultsTopic: String = "testAvroInvalidDefaultsTopic1"
val PaymentDateTopic: String = "testPaymentDateTopic"
val GeneratedWithLogicalTypesTopic: String = "testGeneratedWithLogicalTypesTopic"

val IntSchema: Schema = AvroUtils.parseSchema(
"""{
Expand All @@ -53,6 +55,7 @@ trait KafkaAvroSourceSpecMixin {
""".stripMargin
)

// ALL schemas, for Generic and Specific records, must be regitered in schema registry
val schemaRegistryMockClient: CSchemaRegistryClient = new MockConfluentSchemaRegistryClientBuilder()
.register(RecordTopic, FullNameV1.schema, 1, isKey = false)
.register(RecordTopic, FullNameV2.schema, 2, isKey = false)
Expand All @@ -62,6 +65,8 @@ trait KafkaAvroSourceSpecMixin {
.register(IntTopicWithKey, IntSchema, 1, isKey = false)
.register(IntTopicWithKey, IntSchema, 1, isKey = true)
.register(InvalidDefaultsTopic, InvalidDefaultsSchema, 1, isKey = false)
.register(PaymentDateTopic, PaymentDate.schema, 1, isKey = false)
.register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypes.getClassSchema, 1, isKey = false)
.build

val factory: CachedConfluentSchemaRegistryClientFactory = TestSchemaRegistryClientFactory(schemaRegistryMockClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource

test("all topic strategy test") {
val strategy = new AllTopicsSelectionStrategy()
strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(Set(RecordTopic, RecordTopicWithKey, IntTopicNoKey, IntTopicWithKey, InvalidDefaultsTopic))
strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(Set(RecordTopic, RecordTopicWithKey, IntTopicNoKey, IntTopicWithKey, InvalidDefaultsTopic, PaymentDateTopic, GeneratedWithLogicalTypesTopic))
}

test("topic filtering strategy test") {
Expand Down