Skip to content

Commit

Permalink
Problem with generic/specific record serialization to Arra[byte] whil…
Browse files Browse the repository at this point in the history
…e test data parsing.
  • Loading branch information
gskrobisz committed Jun 28, 2021
1 parent 7f68996 commit 0e46765
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResu
import pl.touk.nussknacker.engine.api.{MetaData, StreamMetaData, VariableConstants}
import pl.touk.nussknacker.engine.avro.KafkaAvroBaseTransformer.{SchemaVersionParamName, TopicParamName}
import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{AvroStringSettings, FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaVersionOption}
import pl.touk.nussknacker.engine.compile.ExpressionCompiler
Expand All @@ -24,6 +24,7 @@ import pl.touk.nussknacker.engine.testing.LocalModelData
import pl.touk.nussknacker.engine.util.process.EmptyProcessConfigCreator

import java.nio.charset.StandardCharsets
import java.time.{Instant, LocalDateTime, ZoneOffset}
import scala.collection.immutable.ListMap

class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvroSourceSpecMixin {
Expand Down Expand Up @@ -52,6 +53,20 @@ class KafkaAvroPayloadSourceFactorySpec extends KafkaAvroSpecMixin with KafkaAvr
roundTripKeyValueObject(specificSourceFactory[FullNameV1], useStringForKey = true, RecordTopic, ExistingSchemaVersion(1), null, givenObj)
}

test("should read last generated generic record with logical types") {
val givenObj = PaymentDate.record

roundTripKeyValueObject(avroSourceFactory, useStringForKey = true, PaymentDateTopic, ExistingSchemaVersion(1), "", givenObj)
}

test("should read last generated specific record with logical types ") {
val date = LocalDateTime.of(2020, 1, 2, 3, 14, 15)
val decimal = new java.math.BigDecimal("12.34")
val givenObj = GeneratedAvroClassWithLogicalTypesNewSchema(date.toInstant(ZoneOffset.UTC), date.toLocalDate, date.toLocalTime, "loremipsum", decimal)

roundTripKeyValueObject(avroSourceFactory, useStringForKey = true, GeneratedWithLogicalTypesTopic, ExistingSchemaVersion(1), "", givenObj)
}

test("should read generated record in v2") {
val givenObj = FullNameV2.createRecord("Jan", "Maria", "Kowalski")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.avro.{AvroUtils, TestSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.encode.{BestEffortAvroEncoder, ValidationMode}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, PaymentV1}
import pl.touk.nussknacker.engine.avro.schema.{FullNameV1, FullNameV2, GeneratedAvroClassWithLogicalTypesNewSchema, PaymentDate, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder}
import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor

Expand All @@ -29,6 +29,8 @@ trait KafkaAvroSourceSpecMixin {
val IntTopicWithKey: String = "testAvroIntTopic1WithKey"
val IntTopicNoKey: String = "testAvroIntTopic1NoKey"
val InvalidDefaultsTopic: String = "testAvroInvalidDefaultsTopic1"
val PaymentDateTopic: String = "testPaymentDateTopic"
val GeneratedWithLogicalTypesTopic: String = "testGeneratedWithLogicalTypesTopic"

val IntSchema: Schema = AvroUtils.parseSchema(
"""{
Expand Down Expand Up @@ -62,6 +64,8 @@ trait KafkaAvroSourceSpecMixin {
.register(IntTopicWithKey, IntSchema, 1, isKey = false)
.register(IntTopicWithKey, IntSchema, 1, isKey = true)
.register(InvalidDefaultsTopic, InvalidDefaultsSchema, 1, isKey = false)
.register(PaymentDateTopic, PaymentDate.schema, 1, isKey = false)
.register(GeneratedWithLogicalTypesTopic, GeneratedAvroClassWithLogicalTypesNewSchema.schema, 1, isKey = false)
.build

val factory: CachedConfluentSchemaRegistryClientFactory = TestSchemaRegistryClientFactory(schemaRegistryMockClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TopicSelectionStrategySpec extends KafkaAvroSpecMixin with KafkaAvroSource

test("all topic strategy test") {
val strategy = new AllTopicsSelectionStrategy()
strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(Set(RecordTopic, RecordTopicWithKey, IntTopicNoKey, IntTopicWithKey, InvalidDefaultsTopic))
strategy.getTopics(confluentClient).toList.map(_.toSet) shouldBe List(Set(RecordTopic, RecordTopicWithKey, IntTopicNoKey, IntTopicWithKey, InvalidDefaultsTopic, PaymentDateTopic, GeneratedWithLogicalTypesTopic))
}

test("topic filtering strategy test") {
Expand Down

0 comments on commit 0e46765

Please sign in to comment.