From e66c5070edda0153cd86d360f160c074bcfccab3 Mon Sep 17 00:00:00 2001 From: Arek Burdach Date: Tue, 19 Jul 2022 16:43:25 +0200 Subject: [PATCH] Using ParsedSchema as long as it is possible #3 --- docs/Changelog.md | 4 ++- docs/MigrationGuide.md | 12 +++++--- .../KafkaAvroTestProcessConfigCreator.scala | 4 +-- .../KafkaJsonPayloadIntegrationSpec.scala | 4 +-- .../avro/helpers/KafkaAvroSpecMixin.scala | 4 +-- .../ConfluentKafkaAvroSeDeSpecMixin.scala | 4 +-- ...SinkFactoryWithEditorIntegrationTest.scala | 2 +- .../FlinkKafkaComponentProvider.scala | 28 ++++++++----------- .../sample/DevProcessConfigCreator.scala | 6 ++-- .../MockFlinkKafkaComponentProvider.scala | 2 +- .../LiteKafkaComponentProvider.scala | 28 ++++++++----------- ...r.scala => SchemaBasedSerdeProvider.scala} | 2 +- .../ConfluentSchemaBasedSerdeProvider.scala | 5 ++-- ...fkaAvroDeserializationSchemaFactory.scala} | 1 - ...fluentJsonPayloadDeserializerFactory.scala | 1 - .../avro/sink/KafkaAvroSinkFactory.scala | 4 +-- .../sink/KafkaAvroSinkFactoryWithEditor.scala | 4 +-- .../avro/sink/UniversalKafkaSinkFactory.scala | 4 +-- .../avro/source/KafkaAvroSourceFactory.scala | 4 +-- ...SpecificRecordKafkaAvroSourceFactory.scala | 4 +-- .../source/UniversalKafkaSourceFactory.scala | 4 +-- .../DelayedKafkaAvroSourceFactory.scala | 4 +-- 22 files changed, 64 insertions(+), 71 deletions(-) rename utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/{SchemaBasedMessagesSerdeProvider.scala => SchemaBasedSerdeProvider.scala} (91%) rename utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/{ConfluentKafkaAvroDeserializerFactory.scala => ConfluentKafkaAvroDeserializationSchemaFactory.scala} (98%) diff --git a/docs/Changelog.md b/docs/Changelog.md index d894d9726e0..31f36512dde 100644 --- a/docs/Changelog.md +++ b/docs/Changelog.md @@ -33,7 +33,9 @@ * [#3238](https://github.com/TouK/nussknacker/pull/3238) K8 runtime's logback conf can be stored in single ConfigMap for all runtime pods * [#3201](https://github.com/TouK/nussknacker/pull/3201) Added literal types * [#3240](https://github.com/TouK/nussknacker/pull/3240) Error topic created by default if not exists -* [#3245](https://github.com/TouK/nussknacker/pull/3245) [#3265](https://github.com/TouK/nussknacker/pull/3265)[#3288](https://github.com/TouK/nussknacker/pull/3288)[#3295](https://github.com/TouK/nussknacker/pull/3295) Universal kafka source/sink, handling multiple scenarios like: avro message for avro schema, json message for json schema +* [#3245](https://github.com/TouK/nussknacker/pull/3245) [#3265](https://github.com/TouK/nussknacker/pull/3265) + [#3288](https://github.com/TouK/nussknacker/pull/3288) [#3295](https://github.com/TouK/nussknacker/pull/3295) [#3297](https://github.com/TouK/nussknacker/pull/3297) + Universal kafka source/sink, handling multiple scenarios like: avro message for avro schema, json message for json schema * [#3249](https://github.com/TouK/nussknacker/pull/3249) Confluent 5.5->7.2, avro 1.9->1.11 bump * [#3250](https://github.com/TouK/nussknacker/pull/3250) Kafka 2.4 -> 3.2, flink 0.14.4 -> 0.14.5 * [#3270](https://github.com/TouK/nussknacker/pull/3270) Added type representing null diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index 3e98023a907..f986fb29521 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -26,13 +26,17 @@ To see the biggest differences please consult the [changelog](Changelog.md). * Invalid is representation of process compilation errors * Valid is representation of positive and negative scenario running result * [#3255](https://github.com/TouK/nussknacker/pull/3255) `TestReporter` util class is safer to use in parallel tests, methods require passing scenario name -* [#3265](https://github.com/TouK/nussknacker/pull/3265) Initial work on UniversalKafkaSource/Sink: +* [#3265](https://github.com/TouK/nussknacker/pull/3265) [#3288](https://github.com/TouK/nussknacker/pull/3288) [3297](https://github.com/TouK/nussknacker/pull/3297) Changes related with UniversalKafkaSource/Sink: * `RuntimeSchemaData` is generic - parametrized by `ParsedSchema` (but only AvroSchema is supported for now). + * `NkSerializableAvroSchema` renamed to `NkSerializableParsedoSchema * `SchemaWithMetadata` wraps `ParsedSchema` instead of avro `Schema`. + * `SchemaRegistryProvider` refactoring: + * rename `SchemaRegistryProvider` to `SchemaBasedSerdeProvider` + * decouple `SchemaRegistryClientFactory` from `SchemaBasedSerdeProvider` + * `KafkaAvroKeyValueDeserializationSchemaFactory` renamed to `KafkaSchemaBasedKeyValueDeserializationSchemaFactory` + * `KafkaAvroValueSerializationSchemaFactory` renamed to `KafkaSchemaBasedValueSerializationSchemaFactory` + * `KafkaAvroKeyValueSerializationSchemaFactory` renamed to `KafkaSchemaBasedKeyValueSerializationSchemaFactory` * [#3253](https://github.com/TouK/nussknacker/pull/3253) `DeploymentManager` has separate `validate` method, which should perform initial scenario validation and return reasonably quickly (while deploy can e.g. make Flink savepoint etc.) -* [#3288](https://github.com/TouK/nussknacker/pull/3288) `SchemaRegistryProvider` refactoring: - * rename `SchemaRegistryProvider` to `SchemaBasedMessagesSerdeProvider` - * decouple `BaseSchemaRegistryProvider` from `SchemaBasedMessagesSerdeProvider` ### REST API changes diff --git a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaAvroTestProcessConfigCreator.scala b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaAvroTestProcessConfigCreator.scala index 69e1f05a4c1..271fb0c3dec 100644 --- a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaAvroTestProcessConfigCreator.scala +++ b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaAvroTestProcessConfigCreator.scala @@ -4,7 +4,7 @@ import org.apache.avro.specific.SpecificRecord import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.process._ import pl.touk.nussknacker.engine.avro.schema.{GeneratedAvroClassSample, GeneratedAvroClassWithLogicalTypes} -import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider +import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.sink.flink.FlinkKafkaAvroSinkImplFactory @@ -62,7 +62,7 @@ abstract class KafkaAvroTestProcessConfigCreator extends EmptyProcessConfigCreat protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory - protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory) + protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory) } diff --git a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaJsonPayloadIntegrationSpec.scala b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaJsonPayloadIntegrationSpec.scala index 70708d2570a..21d1f479216 100644 --- a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaJsonPayloadIntegrationSpec.scala +++ b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/KafkaJsonPayloadIntegrationSpec.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.avro.helpers.{KafkaAvroSpecMixin, SimpleKafkaJ import pl.touk.nussknacker.engine.avro.schema.{GeneratedAvroClassSampleSchema, PaymentV1} import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient} -import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider} +import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider} import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar import pl.touk.nussknacker.engine.testing.LocalModelData @@ -17,7 +17,7 @@ class KafkaJsonPayloadIntegrationSpec extends FunSuite with KafkaAvroSpecMixin w import KafkaAvroIntegrationMockSchemaRegistry._ private lazy val creator: KafkaAvroTestProcessConfigCreator = new KafkaAvroTestProcessConfigCreator { - override protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider = + override protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory) override protected def schemaRegistryClientFactory = new MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient) diff --git a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/helpers/KafkaAvroSpecMixin.scala b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/helpers/KafkaAvroSpecMixin.scala index fc06eea06ae..e7a54054ede 100644 --- a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/helpers/KafkaAvroSpecMixin.scala +++ b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/helpers/KafkaAvroSpecMixin.scala @@ -19,7 +19,7 @@ import pl.touk.nussknacker.engine.avro.encode.ValidationMode import pl.touk.nussknacker.engine.avro.kryo.AvroSerializersRegistrar import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory -import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaBasedMessagesSerdeProvider, SchemaVersionOption} +import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaBasedSerdeProvider, SchemaVersionOption} import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactory import pl.touk.nussknacker.engine.avro.sink.flink.FlinkKafkaAvroSinkImplFactory import pl.touk.nussknacker.engine.avro.source.{KafkaAvroSourceFactory, SpecificRecordKafkaAvroSourceFactory} @@ -51,7 +51,7 @@ trait KafkaAvroSpecMixin extends FunSuite with KafkaWithSchemaRegistryOperations protected def confluentClientFactory: ConfluentSchemaRegistryClientFactory // In default test scenario we use avro payload. - protected lazy val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider = + protected lazy val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentClientFactory) protected def executionConfigPreparerChain(modelData: LocalModelData): ExecutionConfigPreparer = diff --git a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroSeDeSpecMixin.scala b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroSeDeSpecMixin.scala index d632474c97b..4bdce627433 100644 --- a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroSeDeSpecMixin.scala +++ b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroSeDeSpecMixin.scala @@ -6,7 +6,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks import pl.touk.nussknacker.engine.avro.TestSchemaRegistryClientFactory import pl.touk.nussknacker.engine.avro.helpers._ import pl.touk.nussknacker.engine.avro.schema.FullNameV1 -import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider +import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder} import pl.touk.nussknacker.engine.kafka.KafkaClient @@ -38,7 +38,7 @@ trait ConfluentKafkaAvroSeDeSpecMixin extends SchemaRegistryMixin with TableDriv } case class SchemaRegistryProviderSetup(`type`: SchemaRegistryProviderSetupType.Value, - provider: SchemaBasedMessagesSerdeProvider, + provider: SchemaBasedSerdeProvider, override val valueSerializer: Serializer[Any], valueDeserializer: Deserializer[Any]) extends KafkaWithSchemaRegistryOperations { diff --git a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/sink/flink/KafkaAvroSinkFactoryWithEditorIntegrationTest.scala b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/sink/flink/KafkaAvroSinkFactoryWithEditorIntegrationTest.scala index e782d37d6c0..bd81e4becb5 100644 --- a/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/sink/flink/KafkaAvroSinkFactoryWithEditorIntegrationTest.scala +++ b/engine/flink/avro-components-utils/src/test/scala/pl/touk/nussknacker/engine/avro/sink/flink/KafkaAvroSinkFactoryWithEditorIntegrationTest.scala @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin import pl.touk.nussknacker.engine.avro.schema.TestSchemaWithRecord import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory} -import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider} +import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider} import pl.touk.nussknacker.engine.avro.{AvroUtils, KafkaAvroTestProcessConfigCreator} import pl.touk.nussknacker.engine.graph.expression import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler diff --git a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala index c4fdf2f2c9c..cf05be53425 100644 --- a/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala +++ b/engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala @@ -4,7 +4,6 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigValueFactory.fromAnyRef import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies -import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.sink.flink.FlinkKafkaAvroSinkImplFactory @@ -20,12 +19,6 @@ class FlinkKafkaComponentProvider extends ComponentProvider { protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory - protected def createAvroSchemaRegistryProvider: SchemaBasedMessagesSerdeProvider = - ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory) - - protected def createJsonSchemaRegistryProvider: SchemaBasedMessagesSerdeProvider = - ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory) - override def providerName: String = "kafka" override def resolveConfigForExecution(config: Config): Config = config @@ -38,25 +31,26 @@ class FlinkKafkaComponentProvider extends ComponentProvider { val schemaRegistryTypedJson = "DataSourcesAndSinks#schema-registry--json-serialization" val noTypeInfo = "DataSourcesAndSinks#no-type-information--json-serialization" - val avroSerializingSchemaRegistryProvider = createAvroSchemaRegistryProvider - val jsonSerializingSchemaRegistryProvider = createJsonSchemaRegistryProvider + val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory) + val jsonPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory) + val universalSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(schemaRegistryClientFactory) val lowLevelKafkaComponents = List( ComponentDefinition("kafka-json", new GenericKafkaJsonSinkFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo), ComponentDefinition("kafka-json", new GenericJsonSourceFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo), ComponentDefinition("kafka-typed-json", new GenericTypedJsonSourceFactory(overriddenDependencies)).withRelativeDocs("DataSourcesAndSinks#manually-typed--json-serialization"), - ComponentDefinition("kafka-avro", new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro), - ComponentDefinition("kafka-avro", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro), - ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonSerializingSchemaRegistryProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(schemaRegistryTypedJson), - ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), - ComponentDefinition("kafka-registry-typed-json-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), - ComponentDefinition("kafka-avro-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro) + ComponentDefinition("kafka-avro", new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro), + ComponentDefinition("kafka-avro", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro), + ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(schemaRegistryTypedJson), + ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), + ComponentDefinition("kafka-registry-typed-json-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), + ComponentDefinition("kafka-avro-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro) ) // TODO: change link to the documentation when json schema handling will be available val universalKafkaComponents = List( - ComponentDefinition("kafka", new UniversalKafkaSourceFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro), - ComponentDefinition("kafka", new UniversalKafkaSinkFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro)) + ComponentDefinition("kafka", new UniversalKafkaSourceFactory(schemaRegistryClientFactory, universalSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro), + ComponentDefinition("kafka", new UniversalKafkaSinkFactory(schemaRegistryClientFactory, universalSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro)) lowLevelKafkaComponents ::: universalKafkaComponents } diff --git a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala index 42a7f621c63..c8d6b734722 100644 --- a/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala +++ b/engine/flink/management/dev-model/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.api._ import pl.touk.nussknacker.engine.api.component.{ComponentGroupName, ParameterConfig, SingleComponentConfig} import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, MandatoryParameterValidator, StringParameterEditor} import pl.touk.nussknacker.engine.api.process._ -import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider +import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient} import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactoryWithEditor @@ -67,13 +67,13 @@ class DevProcessConfigCreator extends ProcessConfigCreator { override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = { val confluentFactory = createSchemaRegistryClientFactory(processObjectDependencies) - val schemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory) + val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory) Map( "sendSms" -> all(new SingleValueSinkFactory(new DiscardingSink)), "monitor" -> categories(SinkFactory.noParam(EmptySink)), "communicationSink" -> categories(DynamicParametersSink), "kafka-string" -> all(new KafkaSinkFactory(new SimpleSerializationSchema[AnyRef](_, String.valueOf), processObjectDependencies, FlinkKafkaSinkImplFactory)), - "kafka-avro" -> all(new KafkaAvroSinkFactoryWithEditor(confluentFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, FlinkKafkaAvroSinkImplFactory)) + "kafka-avro" -> all(new KafkaAvroSinkFactoryWithEditor(confluentFactory, avroPayloadSerdeProvider, processObjectDependencies, FlinkKafkaAvroSinkImplFactory)) ) } diff --git a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/MockFlinkKafkaComponentProvider.scala b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/MockFlinkKafkaComponentProvider.scala index a1f603dcf42..f00c3949793 100644 --- a/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/MockFlinkKafkaComponentProvider.scala +++ b/engine/flink/tests/src/test/scala/pl/touk/nussknacker/defaultmodel/MockFlinkKafkaComponentProvider.scala @@ -1,7 +1,7 @@ package pl.touk.nussknacker.defaultmodel import pl.touk.nussknacker.defaultmodel.MockSchemaRegistry.schemaRegistryMockClient -import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider +import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.MockConfluentSchemaRegistryClientFactory import pl.touk.nussknacker.engine.flink.util.transformer.FlinkKafkaComponentProvider diff --git a/engine/lite/components/kafka/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteKafkaComponentProvider.scala b/engine/lite/components/kafka/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteKafkaComponentProvider.scala index da88b934bce..d4236c7ad1f 100644 --- a/engine/lite/components/kafka/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteKafkaComponentProvider.scala +++ b/engine/lite/components/kafka/src/main/scala/pl/touk/nussknacker/engine/lite/components/LiteKafkaComponentProvider.scala @@ -4,7 +4,6 @@ import com.typesafe.config.Config import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.api.typed.TypedMap -import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory} @@ -37,12 +36,6 @@ class LiteKafkaComponentProvider(schemaRegistryClientFactory: ConfluentSchemaReg override def providerName: String = "kafka" - protected def createAvroSchemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider = - ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory) - - protected def createJsonSchemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider = - ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory) - override def resolveConfigForExecution(config: Config): Config = config override def create(config: Config, dependencies: ProcessObjectDependencies): List[ComponentDefinition] = { @@ -52,8 +45,9 @@ class LiteKafkaComponentProvider(schemaRegistryClientFactory: ConfluentSchemaReg val schemaRegistryTypedJson = "DataSourcesAndSinks#schema-registry--json-serialization" val noTypeInfo = "DataSourcesAndSinks#no-type-information--json-serialization" - val avroPayloadAvroSchemaBasedMessagesSerdeProvider = createAvroSchemaBasedMessagesSerdeProvider - val jsonPayloadAvroSchemaBasedMessagesSerdeProvider = createJsonSchemaBasedMessagesSerdeProvider + val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory) + val jsonPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory) + val universalSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(schemaRegistryClientFactory) val lowLevelKafkaComponents = List( ComponentDefinition(KafkaJsonName, new KafkaSinkFactory(GenericJsonSerialization(_), dependencies, LiteKafkaSinkImplFactory)).withRelativeDocs(noTypeInfo), @@ -63,17 +57,17 @@ class LiteKafkaComponentProvider(schemaRegistryClientFactory: ConfluentSchemaReg ConsumerRecordDeserializationSchemaFactory.fixedValueDeserialization(deserializeToTypedMap), jsonFormatterFactory, dependencies, new LiteKafkaSourceImplFactory ) with BaseGenericTypedJsonSourceFactory).withRelativeDocs("DataSourcesAndSinks#manually-typed--json-serialization"), - ComponentDefinition(KafkaAvroName, new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, new LiteKafkaSourceImplFactory)).withRelativeDocs(avro), - ComponentDefinition(KafkaAvroName, new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(avro), - ComponentDefinition(KafkaRegistryTypedJsonName, new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, new LiteKafkaSourceImplFactory)).withRelativeDocs(schemaRegistryTypedJson), - ComponentDefinition(KafkaRegistryTypedJsonName, new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), - ComponentDefinition(KafkaSinkRegistryTypedRawJsonName, new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), - ComponentDefinition(KafkaSinkRawAvroName, new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(avro)) + ComponentDefinition(KafkaAvroName, new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, dependencies, new LiteKafkaSourceImplFactory)).withRelativeDocs(avro), + ComponentDefinition(KafkaAvroName, new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroPayloadSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(avro), + ComponentDefinition(KafkaRegistryTypedJsonName, new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, dependencies, new LiteKafkaSourceImplFactory)).withRelativeDocs(schemaRegistryTypedJson), + ComponentDefinition(KafkaRegistryTypedJsonName, new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonPayloadSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), + ComponentDefinition(KafkaSinkRegistryTypedRawJsonName, new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson), + ComponentDefinition(KafkaSinkRawAvroName, new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(avro)) // TODO: change link to the documentation when json schema handling will be available val universalKafkaComponents = List( - ComponentDefinition(KafkaUniversalName, new UniversalKafkaSourceFactory(schemaRegistryClientFactory, avroPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, new LiteKafkaSourceImplFactory)).withRelativeDocs(avro), - ComponentDefinition(KafkaUniversalName, new UniversalKafkaSinkFactory(schemaRegistryClientFactory, avroPayloadAvroSchemaBasedMessagesSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(avro)) + ComponentDefinition(KafkaUniversalName, new UniversalKafkaSourceFactory(schemaRegistryClientFactory, universalSerdeProvider, dependencies, new LiteKafkaSourceImplFactory)).withRelativeDocs(avro), + ComponentDefinition(KafkaUniversalName, new UniversalKafkaSinkFactory(schemaRegistryClientFactory, universalSerdeProvider, dependencies, LiteKafkaAvroSinkImplFactory)).withRelativeDocs(avro)) lowLevelKafkaComponents ::: universalKafkaComponents } diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/SchemaBasedMessagesSerdeProvider.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/SchemaBasedSerdeProvider.scala similarity index 91% rename from utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/SchemaBasedMessagesSerdeProvider.scala rename to utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/SchemaBasedSerdeProvider.scala index 71bbfea2d75..fde61a058fd 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/SchemaBasedMessagesSerdeProvider.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/SchemaBasedSerdeProvider.scala @@ -5,7 +5,7 @@ import io.confluent.kafka.schemaregistry.ParsedSchema import pl.touk.nussknacker.engine.avro.serialization.{KafkaSchemaBasedDeserializationSchemaFactory, KafkaSchemaBasedSerializationSchemaFactory} import pl.touk.nussknacker.engine.kafka.RecordFormatterFactory -trait SchemaBasedMessagesSerdeProvider extends Serializable { +trait SchemaBasedSerdeProvider extends Serializable { def deserializationSchemaFactory: KafkaSchemaBasedDeserializationSchemaFactory diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentSchemaBasedSerdeProvider.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentSchemaBasedSerdeProvider.scala index d3bee1852a7..50fc811f05e 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentSchemaBasedSerdeProvider.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/ConfluentSchemaBasedSerdeProvider.scala @@ -10,13 +10,13 @@ import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.formatter.{Confl import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.jsonpayload.{ConfluentJsonPayloadSerializerFactory, ConfluentKeyValueKafkaJsonDeserializerFactory} import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.universal.ConfluentKeyValueUniversalKafkaDeserializationFactory import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.{ConfluentKeyValueKafkaAvroDeserializationFactory, ConfluentSchemaBasedSerializationSchemaFactory} -import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedMessagesSerdeProvider, SchemaRegistryError, SchemaRegistryUnsupportedTypeError} +import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryError, SchemaRegistryUnsupportedTypeError} import pl.touk.nussknacker.engine.avro.serialization.{KafkaSchemaBasedDeserializationSchemaFactory, KafkaSchemaBasedSerializationSchemaFactory} import pl.touk.nussknacker.engine.kafka.RecordFormatterFactory class ConfluentSchemaBasedSerdeProvider(val serializationSchemaFactory: KafkaSchemaBasedSerializationSchemaFactory, val deserializationSchemaFactory: KafkaSchemaBasedDeserializationSchemaFactory, - val recordFormatterFactory: RecordFormatterFactory) extends SchemaBasedMessagesSerdeProvider { + val recordFormatterFactory: RecordFormatterFactory) extends SchemaBasedSerdeProvider { override def validateSchema[T <: ParsedSchema](schema: T): ValidatedNel[SchemaRegistryError, T] = { schema match { case s: AvroSchema => { @@ -37,6 +37,7 @@ object ConfluentSchemaBasedSerdeProvider extends Serializable { def universal(schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory): ConfluentSchemaBasedSerdeProvider = { ConfluentSchemaBasedSerdeProvider( + // TODO: add "switch" in serialization factory new ConfluentSchemaBasedSerializationSchemaFactory(schemaRegistryClientFactory), new ConfluentKeyValueUniversalKafkaDeserializationFactory(schemaRegistryClientFactory), new ConfluentAvroToJsonFormatterFactory(schemaRegistryClientFactory) diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializerFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala similarity index 98% rename from utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializerFactory.scala rename to utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala index 829fc1a9894..da4a8d40f86 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializerFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/ConfluentKafkaAvroDeserializationSchemaFactory.scala @@ -21,7 +21,6 @@ trait ConfluentKafkaAvroDeserializerFactory extends LazyLogging { val avroSchemaDataOpt = schemaDataOpt.map { schemaData => schemaData.schema match { case _: AvroSchema => schemaData.asInstanceOf[RuntimeSchemaData[AvroSchema]] - // TODO: handle json schema case other => throw new IllegalArgumentException(s"Unsupported schema class: ${other.getClass}") } } diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/jsonpayload/ConfluentJsonPayloadDeserializerFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/jsonpayload/ConfluentJsonPayloadDeserializerFactory.scala index 774576a0b2c..63cb0834afd 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/jsonpayload/ConfluentJsonPayloadDeserializerFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/schemaregistry/confluent/serialization/jsonpayload/ConfluentJsonPayloadDeserializerFactory.scala @@ -72,7 +72,6 @@ trait ConfluentJsonPayloadDeserializer { val avroSchemaDataOpt = schemaDataOpt.map { schemaData => schemaData.schema match { case _: AvroSchema => schemaData.asInstanceOf[RuntimeSchemaData[AvroSchema]] - // TODO: handle json schema case other => throw new IllegalArgumentException(s"Unsupported schema class: ${other.getClass}") } } diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala index 6da26ef0bb3..46c991f4dcf 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactory.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Sink, import pl.touk.nussknacker.engine.api.typed.CustomNodeValidationException import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData} import pl.touk.nussknacker.engine.avro.encode.{OutputValidator, ValidationMode} -import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.{KafkaAvroBaseComponentTransformer, KafkaAvroBaseTransformer, RuntimeSchemaData, SchemaDeterminerErrorHandler} import pl.touk.nussknacker.engine.api.NodeId @@ -30,7 +30,7 @@ object KafkaAvroSinkFactory { } class KafkaAvroSinkFactory(val schemaRegistryClientFactory: SchemaRegistryClientFactory, - val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, val processObjectDependencies: ProcessObjectDependencies, implProvider: KafkaAvroSinkImplFactory) extends KafkaAvroBaseTransformer[Sink] with SinkFactory { diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala index c88d856d30d..ce5754f7a5d 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/KafkaAvroSinkFactoryWithEditor.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Sink, import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData} import pl.touk.nussknacker.engine.avro.KafkaAvroBaseComponentTransformer.{SchemaVersionParamName, SinkKeyParamName} import pl.touk.nussknacker.engine.avro.encode.ValidationMode -import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactoryWithEditor.TransformationState import pl.touk.nussknacker.engine.avro.{KafkaAvroBaseComponentTransformer, KafkaAvroBaseTransformer, RuntimeSchemaData, SchemaDeterminerErrorHandler} import pl.touk.nussknacker.engine.api.NodeId @@ -28,7 +28,7 @@ object KafkaAvroSinkFactoryWithEditor { } class KafkaAvroSinkFactoryWithEditor(val schemaRegistryClientFactory: SchemaRegistryClientFactory, - val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, val processObjectDependencies: ProcessObjectDependencies, implProvider: KafkaAvroSinkImplFactory) extends KafkaAvroBaseTransformer[Sink] with SinkFactory { diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/UniversalKafkaSinkFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/UniversalKafkaSinkFactory.scala index f3ccc9d06d2..2f09008ae9d 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/UniversalKafkaSinkFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/sink/UniversalKafkaSinkFactory.scala @@ -10,7 +10,7 @@ import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Sink, import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData, NodeId} import pl.touk.nussknacker.engine.avro.KafkaAvroBaseComponentTransformer.{SchemaVersionParamName, SinkKeyParamName} import pl.touk.nussknacker.engine.avro.encode.ValidationMode -import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.sink.UniversalKafkaSinkFactory.TransformationState import pl.touk.nussknacker.engine.avro.{KafkaAvroBaseComponentTransformer, KafkaAvroBaseTransformer, RuntimeSchemaData, SchemaDeterminerErrorHandler} import pl.touk.nussknacker.engine.util.sinkvalue.SinkValue @@ -31,7 +31,7 @@ object UniversalKafkaSinkFactory { } class UniversalKafkaSinkFactory(val schemaRegistryClientFactory: SchemaRegistryClientFactory, - val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, val processObjectDependencies: ProcessObjectDependencies, implProvider: KafkaAvroSinkImplFactory) extends KafkaAvroBaseTransformer[Sink] with SinkFactory { diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala index 246505ab57b..7198735321c 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/KafkaAvroSourceFactory.scala @@ -13,7 +13,7 @@ import pl.touk.nussknacker.engine.api.definition.{NodeDependency, OutputVariable import pl.touk.nussknacker.engine.api.process.{ContextInitializer, ProcessObjectDependencies, Source, SourceFactory} import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown} import pl.touk.nussknacker.engine.avro.KafkaAvroBaseComponentTransformer.SchemaVersionParamName -import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.source.KafkaAvroSourceFactory.KafkaAvroSourceFactoryState import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor import pl.touk.nussknacker.engine.avro.{AvroSchemaDeterminer, KafkaAvroBaseTransformer, RuntimeSchemaData} @@ -40,7 +40,7 @@ import scala.reflect.ClassTag * @tparam V - type of event's value, used to determine if value object is Specific or Generic (for GenericRecords use Any) */ class KafkaAvroSourceFactory[K: ClassTag, V: ClassTag](val schemaRegistryClientFactory: SchemaRegistryClientFactory, - val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, val processObjectDependencies: ProcessObjectDependencies, protected val implProvider: KafkaSourceImplFactory[K, V]) extends SourceFactory diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala index ccace1dbec9..b6cc0303569 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/SpecificRecordKafkaAvroSourceFactory.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, NodeDependencyValue} import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.api.typed.typing.Typed -import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.{AvroUtils, RuntimeSchemaData} import pl.touk.nussknacker.engine.api.NodeId import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.KafkaSourceImplFactory @@ -17,7 +17,7 @@ import scala.reflect.{ClassTag, classTag} * Source factory for specific records - mainly generated from schema. */ class SpecificRecordKafkaAvroSourceFactory[V <: SpecificRecord: ClassTag](schemaRegistryClientFactory: SchemaRegistryClientFactory, - schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, processObjectDependencies: ProcessObjectDependencies, implProvider: KafkaSourceImplFactory[Any, V]) extends KafkaAvroSourceFactory[Any, V](schemaRegistryClientFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, implProvider) { diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/UniversalKafkaSourceFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/UniversalKafkaSourceFactory.scala index 66672e2622a..19cab8cb1a2 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/UniversalKafkaSourceFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/UniversalKafkaSourceFactory.scala @@ -13,7 +13,7 @@ import pl.touk.nussknacker.engine.api.process.{ContextInitializer, ProcessObject import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult, Unknown} import pl.touk.nussknacker.engine.api.{MetaData, NodeId} import pl.touk.nussknacker.engine.avro.KafkaAvroBaseComponentTransformer.SchemaVersionParamName -import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory, SchemaVersionOption} +import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory, SchemaVersionOption} import pl.touk.nussknacker.engine.avro.source.UniversalKafkaSourceFactory.UniversalKafkaSourceFactoryState import pl.touk.nussknacker.engine.avro.typed.AvroSchemaTypeDefinitionExtractor import pl.touk.nussknacker.engine.avro.{KafkaAvroBaseTransformer, ParsedSchemaDeterminer, RuntimeSchemaData} @@ -28,7 +28,7 @@ import scala.reflect.ClassTag * TODO: Move it to some other module when json schema handling will be available */ class UniversalKafkaSourceFactory[K: ClassTag, V: ClassTag](val schemaRegistryClientFactory: SchemaRegistryClientFactory, - val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, val processObjectDependencies: ProcessObjectDependencies, protected val implProvider: KafkaSourceImplFactory[K, V]) extends SourceFactory diff --git a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/delayed/DelayedKafkaAvroSourceFactory.scala b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/delayed/DelayedKafkaAvroSourceFactory.scala index 49d04b035a6..86ccba0ec28 100644 --- a/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/delayed/DelayedKafkaAvroSourceFactory.scala +++ b/utils/avro-components-utils/src/main/scala/pl/touk/nussknacker/engine/avro/source/delayed/DelayedKafkaAvroSourceFactory.scala @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies import pl.touk.nussknacker.engine.avro.KafkaAvroBaseComponentTransformer.SchemaVersionParamName -import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedMessagesSerdeProvider, SchemaRegistryClientFactory} +import pl.touk.nussknacker.engine.avro.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory} import pl.touk.nussknacker.engine.avro.source.KafkaAvroSourceFactory import pl.touk.nussknacker.engine.api.NodeId import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory.KafkaSourceImplFactory @@ -15,7 +15,7 @@ import pl.touk.nussknacker.engine.kafka.source.delayed.DelayedKafkaSourceFactory import scala.reflect.ClassTag class DelayedKafkaAvroSourceFactory[K: ClassTag, V: ClassTag](schemaRegistryClientFactory: SchemaRegistryClientFactory, - schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider, + schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider, processObjectDependencies: ProcessObjectDependencies, implProvider: KafkaSourceImplFactory[K, V]) extends KafkaAvroSourceFactory[K, V](schemaRegistryClientFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, implProvider) {