Skip to content

Commit

Permalink
Using ParsedSchema as long as it is possible #3
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius committed Jul 19, 2022
1 parent ec518b9 commit e66c507
Show file tree
Hide file tree
Showing 22 changed files with 64 additions and 71 deletions.
4 changes: 3 additions & 1 deletion docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
* [#3238](https://github.com/TouK/nussknacker/pull/3238) K8 runtime's logback conf can be stored in single ConfigMap for all runtime pods
* [#3201](https://github.com/TouK/nussknacker/pull/3201) Added literal types
* [#3240](https://github.com/TouK/nussknacker/pull/3240) Error topic created by default if not exists
* [#3245](https://github.com/TouK/nussknacker/pull/3245) [#3265](https://github.com/TouK/nussknacker/pull/3265)[#3288](https://github.com/TouK/nussknacker/pull/3288)[#3295](https://github.com/TouK/nussknacker/pull/3295) Universal kafka source/sink, handling multiple scenarios like: avro message for avro schema, json message for json schema
* [#3245](https://github.com/TouK/nussknacker/pull/3245) [#3265](https://github.com/TouK/nussknacker/pull/3265)
[#3288](https://github.com/TouK/nussknacker/pull/3288) [#3295](https://github.com/TouK/nussknacker/pull/3295) [#3297](https://github.com/TouK/nussknacker/pull/3297)
Universal kafka source/sink, handling multiple scenarios like: avro message for avro schema, json message for json schema
* [#3249](https://github.com/TouK/nussknacker/pull/3249) Confluent 5.5->7.2, avro 1.9->1.11 bump
* [#3250](https://github.com/TouK/nussknacker/pull/3250) Kafka 2.4 -> 3.2, flink 0.14.4 -> 0.14.5
* [#3270](https://github.com/TouK/nussknacker/pull/3270) Added type representing null
Expand Down
12 changes: 8 additions & 4 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* Invalid is representation of process compilation errors
* Valid is representation of positive and negative scenario running result
* [#3255](https://github.com/TouK/nussknacker/pull/3255) `TestReporter` util class is safer to use in parallel tests, methods require passing scenario name
* [#3265](https://github.com/TouK/nussknacker/pull/3265) Initial work on UniversalKafkaSource/Sink:
* [#3265](https://github.com/TouK/nussknacker/pull/3265) [#3288](https://github.com/TouK/nussknacker/pull/3288) [3297](https://github.com/TouK/nussknacker/pull/3297) Changes related with UniversalKafkaSource/Sink:
* `RuntimeSchemaData` is generic - parametrized by `ParsedSchema` (but only AvroSchema is supported for now).
* `NkSerializableAvroSchema` renamed to `NkSerializableParsedoSchema
* `SchemaWithMetadata` wraps `ParsedSchema` instead of avro `Schema`.
* `SchemaRegistryProvider` refactoring:
* rename `SchemaRegistryProvider` to `SchemaBasedSerdeProvider`
* decouple `SchemaRegistryClientFactory` from `SchemaBasedSerdeProvider`
* `KafkaAvroKeyValueDeserializationSchemaFactory` renamed to `KafkaSchemaBasedKeyValueDeserializationSchemaFactory`
* `KafkaAvroValueSerializationSchemaFactory` renamed to `KafkaSchemaBasedValueSerializationSchemaFactory`
* `KafkaAvroKeyValueSerializationSchemaFactory` renamed to `KafkaSchemaBasedKeyValueSerializationSchemaFactory`
* [#3253](https://github.com/TouK/nussknacker/pull/3253) `DeploymentManager` has separate `validate` method, which should perform initial scenario validation and return reasonably quickly (while deploy can e.g. make Flink savepoint etc.)
* [#3288](https://github.com/TouK/nussknacker/pull/3288) `SchemaRegistryProvider` refactoring:
* rename `SchemaRegistryProvider` to `SchemaBasedMessagesSerdeProvider`
* decouple `BaseSchemaRegistryProvider` from `SchemaBasedMessagesSerdeProvider`

### REST API changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.apache.avro.specific.SpecificRecord
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.avro.schema.{GeneratedAvroClassSample, GeneratedAvroClassWithLogicalTypes}
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.sink.flink.FlinkKafkaAvroSinkImplFactory
Expand Down Expand Up @@ -62,7 +62,7 @@ abstract class KafkaAvroTestProcessConfigCreator extends EmptyProcessConfigCreat

protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory

protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)
protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pl.touk.nussknacker.engine.avro.helpers.{KafkaAvroSpecMixin, SimpleKafkaJ
import pl.touk.nussknacker.engine.avro.schema.{GeneratedAvroClassSampleSchema, PaymentV1}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider}
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider}
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler
import pl.touk.nussknacker.engine.process.registrar.FlinkProcessRegistrar
import pl.touk.nussknacker.engine.testing.LocalModelData
Expand All @@ -17,7 +17,7 @@ class KafkaJsonPayloadIntegrationSpec extends FunSuite with KafkaAvroSpecMixin w
import KafkaAvroIntegrationMockSchemaRegistry._

private lazy val creator: KafkaAvroTestProcessConfigCreator = new KafkaAvroTestProcessConfigCreator {
override protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider =
override protected def createSchemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider =
ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory)

override protected def schemaRegistryClientFactory = new MockConfluentSchemaRegistryClientFactory(schemaRegistryMockClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import pl.touk.nussknacker.engine.avro.encode.ValidationMode
import pl.touk.nussknacker.engine.avro.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaBasedMessagesSerdeProvider, SchemaVersionOption}
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, LatestSchemaVersion, SchemaBasedSerdeProvider, SchemaVersionOption}
import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactory
import pl.touk.nussknacker.engine.avro.sink.flink.FlinkKafkaAvroSinkImplFactory
import pl.touk.nussknacker.engine.avro.source.{KafkaAvroSourceFactory, SpecificRecordKafkaAvroSourceFactory}
Expand Down Expand Up @@ -51,7 +51,7 @@ trait KafkaAvroSpecMixin extends FunSuite with KafkaWithSchemaRegistryOperations
protected def confluentClientFactory: ConfluentSchemaRegistryClientFactory

// In default test scenario we use avro payload.
protected lazy val schemaBasedMessagesSerdeProvider: SchemaBasedMessagesSerdeProvider =
protected lazy val schemaBasedMessagesSerdeProvider: SchemaBasedSerdeProvider =
ConfluentSchemaBasedSerdeProvider.avroPayload(confluentClientFactory)

protected def executionConfigPreparerChain(modelData: LocalModelData): ExecutionConfigPreparer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.scalatest.prop.TableDrivenPropertyChecks
import pl.touk.nussknacker.engine.avro.TestSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.avro.helpers._
import pl.touk.nussknacker.engine.avro.schema.FullNameV1
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientBuilder}
import pl.touk.nussknacker.engine.kafka.KafkaClient
Expand Down Expand Up @@ -38,7 +38,7 @@ trait ConfluentKafkaAvroSeDeSpecMixin extends SchemaRegistryMixin with TableDriv
}

case class SchemaRegistryProviderSetup(`type`: SchemaRegistryProviderSetupType.Value,
provider: SchemaBasedMessagesSerdeProvider,
provider: SchemaBasedSerdeProvider,
override val valueSerializer: Serializer[Any],
valueDeserializer: Deserializer[Any]) extends KafkaWithSchemaRegistryOperations {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import pl.touk.nussknacker.engine.avro.helpers.KafkaAvroSpecMixin
import pl.touk.nussknacker.engine.avro.schema.TestSchemaWithRecord
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedMessagesSerdeProvider}
import pl.touk.nussknacker.engine.avro.schemaregistry.{ExistingSchemaVersion, SchemaBasedSerdeProvider}
import pl.touk.nussknacker.engine.avro.{AvroUtils, KafkaAvroTestProcessConfigCreator}
import pl.touk.nussknacker.engine.graph.expression
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompiler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigValueFactory.fromAnyRef
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.sink.flink.FlinkKafkaAvroSinkImplFactory
Expand All @@ -20,12 +19,6 @@ class FlinkKafkaComponentProvider extends ComponentProvider {

protected def schemaRegistryClientFactory: ConfluentSchemaRegistryClientFactory = CachedConfluentSchemaRegistryClientFactory

protected def createAvroSchemaRegistryProvider: SchemaBasedMessagesSerdeProvider =
ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)

protected def createJsonSchemaRegistryProvider: SchemaBasedMessagesSerdeProvider =
ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory)

override def providerName: String = "kafka"

override def resolveConfigForExecution(config: Config): Config = config
Expand All @@ -38,25 +31,26 @@ class FlinkKafkaComponentProvider extends ComponentProvider {
val schemaRegistryTypedJson = "DataSourcesAndSinks#schema-registry--json-serialization"
val noTypeInfo = "DataSourcesAndSinks#no-type-information--json-serialization"

val avroSerializingSchemaRegistryProvider = createAvroSchemaRegistryProvider
val jsonSerializingSchemaRegistryProvider = createJsonSchemaRegistryProvider
val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(schemaRegistryClientFactory)
val jsonPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.jsonPayload(schemaRegistryClientFactory)
val universalSerdeProvider = ConfluentSchemaBasedSerdeProvider.universal(schemaRegistryClientFactory)

val lowLevelKafkaComponents = List(
ComponentDefinition("kafka-json", new GenericKafkaJsonSinkFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo),
ComponentDefinition("kafka-json", new GenericJsonSourceFactory(overriddenDependencies)).withRelativeDocs(noTypeInfo),
ComponentDefinition("kafka-typed-json", new GenericTypedJsonSourceFactory(overriddenDependencies)).withRelativeDocs("DataSourcesAndSinks#manually-typed--json-serialization"),
ComponentDefinition("kafka-avro", new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro),
ComponentDefinition("kafka-avro", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro),
ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonSerializingSchemaRegistryProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-registry-typed-json-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-avro-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro)
ComponentDefinition("kafka-avro", new KafkaAvroSourceFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro),
ComponentDefinition("kafka-avro", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro),
ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSourceFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-registry-typed-json", new KafkaAvroSinkFactoryWithEditor(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-registry-typed-json-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, jsonPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(schemaRegistryTypedJson),
ComponentDefinition("kafka-avro-raw", new KafkaAvroSinkFactory(schemaRegistryClientFactory, avroPayloadSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro)
)

// TODO: change link to the documentation when json schema handling will be available
val universalKafkaComponents = List(
ComponentDefinition("kafka", new UniversalKafkaSourceFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro),
ComponentDefinition("kafka", new UniversalKafkaSinkFactory(schemaRegistryClientFactory, avroSerializingSchemaRegistryProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro))
ComponentDefinition("kafka", new UniversalKafkaSourceFactory(schemaRegistryClientFactory, universalSerdeProvider, overriddenDependencies, new FlinkKafkaSourceImplFactory(None))).withRelativeDocs(avro),
ComponentDefinition("kafka", new UniversalKafkaSinkFactory(schemaRegistryClientFactory, universalSerdeProvider, overriddenDependencies, FlinkKafkaAvroSinkImplFactory)).withRelativeDocs(avro))

lowLevelKafkaComponents ::: universalKafkaComponents
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{ComponentGroupName, ParameterConfig, SingleComponentConfig}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, MandatoryParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkFactoryWithEditor
Expand Down Expand Up @@ -67,13 +67,13 @@ class DevProcessConfigCreator extends ProcessConfigCreator {

override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = {
val confluentFactory = createSchemaRegistryClientFactory(processObjectDependencies)
val schemaBasedMessagesSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory)
val avroPayloadSerdeProvider = ConfluentSchemaBasedSerdeProvider.avroPayload(confluentFactory)
Map(
"sendSms" -> all(new SingleValueSinkFactory(new DiscardingSink)),
"monitor" -> categories(SinkFactory.noParam(EmptySink)),
"communicationSink" -> categories(DynamicParametersSink),
"kafka-string" -> all(new KafkaSinkFactory(new SimpleSerializationSchema[AnyRef](_, String.valueOf), processObjectDependencies, FlinkKafkaSinkImplFactory)),
"kafka-avro" -> all(new KafkaAvroSinkFactoryWithEditor(confluentFactory, schemaBasedMessagesSerdeProvider, processObjectDependencies, FlinkKafkaAvroSinkImplFactory))
"kafka-avro" -> all(new KafkaAvroSinkFactoryWithEditor(confluentFactory, avroPayloadSerdeProvider, processObjectDependencies, FlinkKafkaAvroSinkImplFactory))
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pl.touk.nussknacker.defaultmodel

import pl.touk.nussknacker.defaultmodel.MockSchemaRegistry.schemaRegistryMockClient
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedMessagesSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.MockConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.flink.util.transformer.FlinkKafkaComponentProvider
Expand Down
Loading

0 comments on commit e66c507

Please sign in to comment.