From 787efed762049b7460ef48a2806ed050b844ca06 Mon Sep 17 00:00:00 2001 From: Grzegorz Skrobisz Date: Tue, 11 May 2021 14:20:54 +0200 Subject: [PATCH] CR fixes part 2, TypeInformation for InputMeta --- docs/MigrationGuide.md | 15 ++- .../javademo/DemoProcessConfigCreator.java | 3 +- .../demo/DemoProcessConfigCreator.scala | 6 +- .../TypeInformationDetection.scala | 9 ++ ...ultAwareTypeInformationCustomisation.scala | 4 +- ...erRecordDeserializationSchemaFactory.scala | 6 +- .../ConsumerRecordToJsonFormatter.scala | 70 +++++++++---- ...edValueDeserializaitionSchemaFactory.scala | 19 ++++ .../engine/kafka/generic/sources.scala | 14 +-- .../engine/kafka/source/InputMeta.scala | 97 ++++++++++++++++--- .../source/KafkaContextInitializer.scala | 2 +- .../kafka/source/KafkaSourceFactory.scala | 51 ++++++++-- .../engine/kafka/KafkaSourceFactorySpec.scala | 19 +++- .../ConsumerRecordDeserializationSpec.scala | 24 +---- .../ConsumerRecordToJsonFormatterSpec.scala | 10 +- .../sample/DevProcessConfigCreator.scala | 5 +- ...gResultAwareTypeInformationDetection.scala | 6 +- .../engine/process/helpers/SampleNodes.scala | 4 +- .../KafkaSourceFactoryIntegrationSpec.scala | 30 +++++- ...fkaSourceFactoryProcessConfigCreator.scala | 25 ++++- .../KafkaSourceFactoryProcessMixin.scala | 16 ++- .../TypeInformationDetectionSpec.scala | 4 +- ...ultAwareTypeInformationDetectionSpec.scala | 5 +- 23 files changed, 333 insertions(+), 111 deletions(-) rename engine/flink/{process/src/main/scala/pl/touk/nussknacker/engine/process => api/src/main/scala/pl/touk/nussknacker/engine/flink/api}/typeinformation/TypingResultAwareTypeInformationCustomisation.scala (64%) create mode 100644 engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/FixedValueDeserializaitionSchemaFactory.scala diff --git a/docs/MigrationGuide.md b/docs/MigrationGuide.md index d6ab807a467..7f11217dc0a 100644 --- a/docs/MigrationGuide.md +++ b/docs/MigrationGuide.md @@ -62,12 +62,17 @@ To see biggest differences please consult the [changelog](Changelog.md). * [#1557](https://github.com/TouK/nussknacker/pull/1556) Some classes from standalone engine were moved to standalone api to remove engine to (model) utils dependency: `StandaloneContext`, `StandaloneContextLifecycle`, `MetricsProvider` * [#1558](https://github.com/TouK/nussknacker/pull/1558) `FlinkProcessRegistrar` takes configuration directly from `FlinkProcessCompiler` (this can affect some tests setup) -* [#1512](https://github.com/TouK/nussknacker/pull/1512) New kafka source `KafkaGenericNodeSourceFactory`, which is based on `GenericNodeTransformation`, gives access to setup of `ValidationContext` and `Context` initialization. - It uses `KafkaGenericContextInitializer` to initialize `Context` with additional variable with kafka event metadata. Factory requires proper deserialization to `ConsumerRecord` (see `ConsumerRecordDeserializationSchemaFactory`). - To replace basic `KafkaSourceFactory` with `KafkaGenericNodeSourceFactory`: - - use `ConsumerRecordDeserializationSchemaFactory` with current `DeserializationSchema` as a value deserializer, add key deserializer +* [#1512](https://github.com/TouK/nussknacker/pull/1512) Replaced `KafkaSourceFactory` with source based on `GenericNodeTransformation`, which gives access to setup of `ValidationContext` and `Context` initialization. + It uses `KafkaContextInitializer` to initialize `Context` with additional variable containing kafka event metadata. Source factory requires deserialization to `ConsumerRecord` (see `ConsumerRecordDeserializationSchemaFactory`). + To migrate `KafkaSourceFactory`: + - provide deserializer factory: + - use `ConsumerRecordDeserializationSchemaFactory` with current `DeserializationSchema` as a value deserializer, add key deserializer + - or use `FixedValueDeserializaitionSchemaFactory` with simple key-as-string deserializer - current `RecordFormater` should be sufficient for value-only serialization, or use `ConsumerRecordToJsonFormatter` for metadata serialization - - implement timestampAssigner that is able to extract time from `ConsumerRecord[K, V]` + - provide timestampAssigner that is able to extract time from `ConsumerRecord[K, V]` + Also: + - removed `BaseKafkaSourceFactory` with multiple topics support: use `KafkaSourceFactory` instead, see test "source with two input topics" + - removed `SingleTopicKafkaSourceFactory`: use `KafkaSourceFactory` with custom `prepareInitialParameters`, `contextTransformation` and `extractTopics` to alter parameter list and provide constant topic value. ## In version 0.3.0 diff --git a/engine/demo/src/main/java/pl/touk/nussknacker/engine/javademo/DemoProcessConfigCreator.java b/engine/demo/src/main/java/pl/touk/nussknacker/engine/javademo/DemoProcessConfigCreator.java index 76b3a6ba018..254cbcf870d 100644 --- a/engine/demo/src/main/java/pl/touk/nussknacker/engine/javademo/DemoProcessConfigCreator.java +++ b/engine/demo/src/main/java/pl/touk/nussknacker/engine/javademo/DemoProcessConfigCreator.java @@ -28,6 +28,7 @@ import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler; import pl.touk.nussknacker.engine.javaapi.process.ExpressionConfig; import pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator; +import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory; import pl.touk.nussknacker.engine.kafka.generic.sources; import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchemaFactory; import pl.touk.nussknacker.engine.kafka.serialization.schemas; @@ -86,7 +87,7 @@ public TypeInformation getProducedType() { } }; return new KafkaSourceFactory<>( - new sources.EspValueDeserializaitionSchemaFactory<>(schema), + new FixedValueDeserializaitionSchemaFactory<>(schema), Option.apply(extractor), sources.JsonRecordFormatter$.MODULE$, processObjectDependencies, diff --git a/engine/demo/src/main/scala/pl/touk/nussknacker/engine/demo/DemoProcessConfigCreator.scala b/engine/demo/src/main/scala/pl/touk/nussknacker/engine/demo/DemoProcessConfigCreator.scala index f57b0b55419..e3c3c0fa820 100644 --- a/engine/demo/src/main/scala/pl/touk/nussknacker/engine/demo/DemoProcessConfigCreator.scala +++ b/engine/demo/src/main/scala/pl/touk/nussknacker/engine/demo/DemoProcessConfigCreator.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.demo import java.time.Duration + import com.typesafe.config.Config import io.circe.Json import org.apache.flink.api.common.typeinfo.TypeInformation @@ -19,7 +20,8 @@ import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestam import pl.touk.nussknacker.engine.flink.util.exception.BrieflyLoggingExceptionHandler import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema import pl.touk.nussknacker.engine.flink.util.transformer.{TransformStateTransformer, UnionTransformer} -import pl.touk.nussknacker.engine.kafka.generic.sources.{EspValueDeserializaitionSchemaFactory, JsonRecordFormatter} +import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory +import pl.touk.nussknacker.engine.kafka.generic.sources.JsonRecordFormatter import pl.touk.nussknacker.engine.kafka.serialization.schemas.SimpleSerializationSchema import pl.touk.nussknacker.engine.kafka.sink.KafkaSinkFactory import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory @@ -71,7 +73,7 @@ class DemoProcessConfigCreator extends ProcessConfigCreator { timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, T]]], processObjectDependencies: ProcessObjectDependencies): FlinkSourceFactory[ConsumerRecord[String, T]] = { val schema = new EspDeserializationSchema[T](bytes => decode(bytes)) - val schemaFactory = new EspValueDeserializaitionSchemaFactory(schema) + val schemaFactory = new FixedValueDeserializaitionSchemaFactory(schema) new KafkaSourceFactory[String, T](schemaFactory, timestampAssigner, JsonRecordFormatter, processObjectDependencies)(classTag[String], ClassTag(implicitly[TypeInformation[T]].getTypeClass)) } diff --git a/engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala b/engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala index 5877361a785..42f51c5997e 100644 --- a/engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala +++ b/engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala @@ -20,3 +20,12 @@ trait TypeInformationDetection extends Serializable { def forValueWithContext[T](validationContext: ValidationContext, value: TypingResult): TypeInformation[ValueWithContext[T]] } + +/** + * Trait that allows for providing more details TypeInformation when TypingResult is known. + */ +trait TypeInformationDetectionForTypingResult extends TypeInformationDetection { + + def forType(typingResult: TypingResult): TypeInformation[Any] + +} diff --git a/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationCustomisation.scala b/engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypingResultAwareTypeInformationCustomisation.scala similarity index 64% rename from engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationCustomisation.scala rename to engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypingResultAwareTypeInformationCustomisation.scala index 8ae609a3545..75421de17d4 100644 --- a/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationCustomisation.scala +++ b/engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypingResultAwareTypeInformationCustomisation.scala @@ -1,4 +1,4 @@ -package pl.touk.nussknacker.engine.process.typeinformation +package pl.touk.nussknacker.engine.flink.api.typeinformation import org.apache.flink.api.common.typeinfo.TypeInformation import pl.touk.nussknacker.engine.api.typed.typing.TypingResult @@ -9,6 +9,6 @@ import pl.touk.nussknacker.engine.api.typed.typing.TypingResult */ trait TypingResultAwareTypeInformationCustomisation { - def customise(originalDetection: TypingResultAwareTypeInformationDetection): PartialFunction[TypingResult, TypeInformation[_]] + def customise(originalDetection: TypeInformationDetectionForTypingResult): PartialFunction[TypingResult, TypeInformation[_]] } diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala index 1b8cf44b11f..e04be4f4e6c 100644 --- a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala @@ -52,12 +52,16 @@ abstract class ConsumerRecordDeserializationSchemaFactory[K, V] extends KafkaDes record.serializedValueSize(), key, value, - record.headers() + record.headers(), + record.leaderEpoch() ) } override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false + // TODO: Provide better way to calculate TypeInformation. Here in case of serialization (of generic type) Kryo is used. + // It is assumed that while this ConsumerRecord[K, V] object lifespan is short, inside of source, this iplementation + // is sufficient. override def getProducedType: TypeInformation[ConsumerRecord[K, V]] = { val clazz = classTag[ConsumerRecord[K, V]].runtimeClass.asInstanceOf[Class[ConsumerRecord[K, V]]] TypeInformation.of(clazz) diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatter.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatter.scala index 6766bd22666..ffa2deef1d2 100644 --- a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatter.scala +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatter.scala @@ -1,8 +1,8 @@ package pl.touk.nussknacker.engine.kafka.consumerrecord import java.nio.charset.StandardCharsets +import java.util.Optional -import com.github.ghik.silencer.silent import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} import io.circe.{Decoder, Encoder} import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema} @@ -14,16 +14,20 @@ import pl.touk.nussknacker.engine.api.test.{TestDataSplit, TestParsingUtils} import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, RecordFormatter} import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord._ -import scala.annotation.nowarn - -@silent("deprecated") -@nowarn("cat=deprecation") +/** + * RecordFormatter used to encode and decode whole raw kafka event (ConsumerRecord) in json format. + * + * @param deserializationSchema - schema used to convert raw kafka event to serializable representation (see SerializableConsumerRecord) + * @param serializationSchema - schema used to convert serializable representation to raw kafka event + * @tparam K - event key type + * @tparam V - event value type + */ class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](deserializationSchema: KafkaDeserializationSchema[ConsumerRecord[K, V]], serializationSchema: KafkaSerializationSchema[ConsumerRecord[K, V]]) extends RecordFormatter { - implicit val consumerRecordDecoder: Decoder[SerializableConsumerRecord[K, V]] = deriveDecoder - implicit val consumerRecordEncoder: Encoder[SerializableConsumerRecord[K, V]] = deriveEncoder + protected val consumerRecordDecoder: Decoder[SerializableConsumerRecord[K, V]] = deriveDecoder + protected val consumerRecordEncoder: Encoder[SerializableConsumerRecord[K, V]] = deriveEncoder override protected def formatRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): Array[Byte] = { val deserializedRecord = deserializationSchema.deserialize(record) @@ -34,14 +38,16 @@ class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](dese Option(deserializedRecord.partition()), Option(deserializedRecord.offset()), Option(deserializedRecord.timestamp()), - Option(ConsumerRecordUtils.toMap(deserializedRecord.headers()).mapValues(s => Option(s))) + Option(deserializedRecord.timestampType().name), + Option(ConsumerRecordUtils.toMap(deserializedRecord.headers()).mapValues(s => Option(s))), + Option(deserializedRecord.leaderEpoch().orElse(null)).map(_.intValue()) //avoids covert null -> 0 conversion ) - implicitly[Encoder[SerializableConsumerRecord[K, V]]].apply(serializableRecord).noSpaces.getBytes(StandardCharsets.UTF_8) + consumerRecordEncoder(serializableRecord).noSpaces.getBytes(StandardCharsets.UTF_8) } override protected def parseRecord(topic: String, bytes: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = { - val serializableRecord = CirceUtil.decodeJsonUnsafe[SerializableConsumerRecord[K, V]](bytes) // decode json in SerializableConsumerRecord[K, V] domain - val serializableConsumerRecord = SerializableConsumerRecord.from(topic, serializableRecord) // update with defaults if fields are missing in json + val serializableRecord = CirceUtil.decodeJsonUnsafe(bytes)(consumerRecordDecoder) // decode json in SerializableConsumerRecord[K, V] domain + val serializableConsumerRecord = toConsumerRecord(topic, serializableRecord) // update with defaults if fields are missing in json // Here serialization schema and ProducerRecord are used to transform key and value to proper Array[Byte]. // Other properties are ignored by serializer and are based on values provided by decoded json (or default empty values). val producerRecord = serializationSchema.serialize(serializableConsumerRecord, serializableConsumerRecord.timestamp()) // serialize K and V to Array[Byte] @@ -50,9 +56,11 @@ class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](dese serializableConsumerRecord.partition, serializableConsumerRecord.offset, serializableConsumerRecord.timestamp, + serializableConsumerRecord.timestampType(), producerRecord.key(), producerRecord.value(), - producerRecord.headers() + producerRecord.headers(), + serializableConsumerRecord.leaderEpoch() ) } @@ -60,28 +68,50 @@ class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](dese } +/** + * Wrapper for ConsumerRecord fields used for test data serialization, eg. json serialization. + * All fields apart from value are optional. + */ +case class SerializableConsumerRecord[K, V](key: Option[K], + value: V, + topic: Option[String], + partition: Option[Int], + offset: Option[Long], + timestamp: Option[Long], + timestampType: Option[String], + headers: Option[Map[String, Option[String]]], + leaderEpoch: Option[Int]) { -case class SerializableConsumerRecord[K, V](key: Option[K], value: V, topic: Option[String], partition: Option[Int], offset: Option[Long], timestamp: Option[Long], headers: Option[Map[String, Option[String]]]) +} object SerializableConsumerRecord { - def createConsumerRecord[K, V](topic: String, partition: Int, offset: Long, timestamp: Long, key: K, value: V, headers: Headers): ConsumerRecord[K, V] = { - new ConsumerRecord(topic, partition, offset, timestamp, - TimestampType.NO_TIMESTAMP_TYPE, ConsumerRecord.NULL_CHECKSUM.longValue(), - ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE, - key, value, headers + /** + * Creates ConsumerRecord with default: checksum, serializedKeySize and serializedValueSize. + */ + def createConsumerRecord[K, V](topic: String, partition: Int, offset: Long, timestamp: Long, timestampType: TimestampType, key: K, value: V, headers: Headers, leaderEpoch: Optional[Integer]): ConsumerRecord[K, V] = { + new ConsumerRecord(topic, partition, offset, + timestamp, timestampType, + ConsumerRecord.NULL_CHECKSUM.longValue(), ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE, + key, value, headers, + leaderEpoch ) } - def from[K, V](topic: String, record: SerializableConsumerRecord[K, V]): ConsumerRecord[K, V] = { + /** + * Converts SerializableConsumerRecord to ConsumerRecord, uses default values in case of missing values. + */ + def toConsumerRecord[K, V](topic: String, record: SerializableConsumerRecord[K, V]): ConsumerRecord[K, V] = { createConsumerRecord( record.topic.getOrElse(topic), record.partition.getOrElse(0), record.offset.getOrElse(0L), record.timestamp.getOrElse(ConsumerRecord.NO_TIMESTAMP), + record.timestampType.map(TimestampType.forName).getOrElse(TimestampType.NO_TIMESTAMP_TYPE), record.key.getOrElse(null.asInstanceOf[K]), record.value, - ConsumerRecordUtils.toHeaders(record.headers.map(_.mapValues(_.orNull)).getOrElse(Map.empty)) + ConsumerRecordUtils.toHeaders(record.headers.map(_.mapValues(_.orNull)).getOrElse(Map.empty)), + Optional.ofNullable(record.leaderEpoch.map(Integer.valueOf).orNull) //avoids covert null -> 0 conversion ) } } diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/FixedValueDeserializaitionSchemaFactory.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/FixedValueDeserializaitionSchemaFactory.scala new file mode 100644 index 00000000000..94c2e3b10ff --- /dev/null +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/FixedValueDeserializaitionSchemaFactory.scala @@ -0,0 +1,19 @@ +package pl.touk.nussknacker.engine.kafka.consumerrecord + +import org.apache.flink.api.common.serialization.DeserializationSchema +import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer} +import pl.touk.nussknacker.engine.kafka.KafkaConfig + +/** + * Wrapper for value-only DeserializationSchema. For kafka event key data it uses simple "Array[Byte] to String" deserialization. + * Used with simple, value-only, sources where event key is empty or ignored. + * + * @param valueSchema - value deserialization schema (e.g. EspDeserializationSchema) + * @tparam V - type of value of deserialized ConsumerRecord + */ +class FixedValueDeserializaitionSchemaFactory[V](valueSchema: DeserializationSchema[V]) extends ConsumerRecordDeserializationSchemaFactory[String, V]{ + override protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[String] = new StringDeserializer + override protected def createValueDeserializer(kafkaConfig: KafkaConfig): Deserializer[V] = new Deserializer[V] { + override def deserialize(topic: String, data: Array[Byte]): V = valueSchema.deserialize(data) + } +} diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/sources.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/sources.scala index 51b58201403..6a6fe011f66 100644 --- a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/sources.scala +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/generic/sources.scala @@ -3,20 +3,19 @@ package pl.touk.nussknacker.engine.kafka.generic import java.nio.charset.StandardCharsets import java.util import java.util.Collections + import io.circe.{Decoder, Json, JsonObject} -import org.apache.flink.api.common.serialization.DeserializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer} import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Source, TestDataGenerator} import pl.touk.nussknacker.engine.api.test.{TestDataSplit, TestParsingUtils} import pl.touk.nussknacker.engine.api.typed._ import pl.touk.nussknacker.engine.api.{CirceUtil, MethodToInvoke, ParamName} import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema -import pl.touk.nussknacker.engine.kafka.consumerrecord.ConsumerRecordDeserializationSchemaFactory +import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory import pl.touk.nussknacker.engine.kafka.source.{KafkaSource, KafkaSourceFactory} import pl.touk.nussknacker.engine.kafka.{BasicFormatter, KafkaConfig, KafkaUtils} import pl.touk.nussknacker.engine.util.Implicits._ @@ -27,15 +26,8 @@ object sources { import collection.JavaConverters._ - class EspValueDeserializaitionSchemaFactory[T](schema: DeserializationSchema[T]) extends ConsumerRecordDeserializationSchemaFactory[String, T]{ - override protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[String] = new StringDeserializer - override protected def createValueDeserializer(kafkaConfig: KafkaConfig): Deserializer[T] = new Deserializer[T] { - override def deserialize(topic: String, data: Array[Byte]): T = schema.deserialize(data) - } - } - class GenericJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, java.util.Map[_, _]]( - new EspValueDeserializaitionSchemaFactory(JsonMapDeserialization), None, JsonRecordFormatter, processObjectDependencies) + new FixedValueDeserializaitionSchemaFactory(JsonMapDeserialization), None, JsonRecordFormatter, processObjectDependencies) class GenericTypedJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends FlinkSourceFactory[TypedMap] with Serializable { diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/InputMeta.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/InputMeta.scala index 6bb442e2f3d..2e64df6e019 100644 --- a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/InputMeta.scala +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/InputMeta.scala @@ -1,27 +1,92 @@ package pl.touk.nussknacker.engine.kafka.source +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.typeutils.MapTypeInfo +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, ScalaCaseClassSerializer} +import org.apache.kafka.common.record.TimestampType import pl.touk.nussknacker.engine.api.typed.typing -import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedObjectTypingResult, TypingResult} +import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult} +import pl.touk.nussknacker.engine.flink.api.typeinformation.{TypeInformationDetectionForTypingResult, TypingResultAwareTypeInformationCustomisation} -import scala.reflect.ClassTag +import scala.collection.immutable.ListMap -case class InputMeta[K](key: K, topic: String, partition: Int, offset: java.lang.Long, timestamp: java.lang.Long, headers: java.util.Map[String, String]) +/** + * InputMeta represents kafka event metadata. It is based on [[org.apache.kafka.clients.consumer.ConsumerRecord]]. + * Ignored fields: checksum, serializedKeySize, serializedValueSize. + * + * @param key - event key + * @param topic - kafka topic + * @param partition - kafka partition + * @param offset - event offset + * @param timestamp - event timestamp + * @param timestampType - see [[org.apache.kafka.common.record.TimestampType]] + * @param headers - event headers converted to map + * @param leaderEpoch - number of leaders previously assigned by the controller (> 0 indicates leader failure) + * @tparam K - type of event key + */ +case class InputMeta[K](key: K, + topic: String, + partition: Integer, + offset: java.lang.Long, + timestamp: java.lang.Long, + timestampType: TimestampType, + headers: java.util.Map[String, String], + leaderEpoch: Integer + ) extends BaseInputMeta object InputMeta { - // TODO: provide better type definition for InputMeta[AnyRef] - // objType should contain K type information - def withType(keyTypingResult: typing.TypingResult): typing.TypingResult = { - TypedObjectTypingResult( - Map( - "key" -> keyTypingResult, - "topic" -> Typed[String], - "partition" -> Typed[Int], - "offset" -> Typed[java.lang.Long], - "timestamp" -> Typed[java.lang.Long], - "headers" -> Typed.typedClass[Map[String, String]] - ), - Typed.typedClass[InputMeta[AnyRef]] + val keyParameterName: String = "key" + + /** + * Provides definition of whole metadata object, with given key type definition (keyTypingResult). + * Here "BaseInputMeta" is used to allow proper type information detection when type erasing occurs. + * See also [[InputMetaAwareTypeInformationCustomisation]] + */ + def withType(keyTypingResult: typing.TypingResult): typing.TypingResult = + TypedObjectTypingResult(ListMap(keyParameterName -> keyTypingResult), Typed.typedClass[BaseInputMeta]) + + def typeInformation[K](keyTypeInformation: TypeInformation[K]): CaseClassTypeInfo[InputMeta[K]] = { + val fieldNames = List(keyParameterName, "topic", "partition", "offset", "timestamp", "timestampType", "headers", "leaderEpoch") + val fieldTypes = List( + keyTypeInformation, + TypeInformation.of(classOf[String]), + TypeInformation.of(classOf[Integer]), + TypeInformation.of(classOf[java.lang.Long]), + TypeInformation.of(classOf[java.lang.Long]), + TypeInformation.of(classOf[TimestampType]), + new MapTypeInfo(classOf[String], classOf[String]), + TypeInformation.of(classOf[Integer]) ) + new CaseClassTypeInfo[InputMeta[K]](classOf[InputMeta[K]], Array.empty, fieldTypes, fieldNames){ + override def createSerializer(config: ExecutionConfig): TypeSerializer[InputMeta[K]] = + new ScalaCaseClassSerializer[InputMeta[K]](classOf[InputMeta[K]], fieldTypes.map(_.createSerializer(config)).toArray) + } + } +} + +/** + * BaseInputMeta keeps definition of metadata fields to extract TypingResult. + */ +trait BaseInputMeta { + def topic: String + def partition: Integer + def offset: java.lang.Long + def timestamp: java.lang.Long + def timestampType: TimestampType + def headers: java.util.Map[String, String] + def leaderEpoch: Integer +} + +/** + * Customisation for TypeInformationDetection that provides type information for BaseInputMeta. + */ +class InputMetaAwareTypeInformationCustomisation extends TypingResultAwareTypeInformationCustomisation { + override def customise(originalDetection: TypeInformationDetectionForTypingResult): PartialFunction[typing.TypingResult, TypeInformation[_]] = { + case a:TypedObjectTypingResult if a.objType.klass == classOf[BaseInputMeta] => + InputMeta.typeInformation(originalDetection.forType(a.fields(InputMeta.keyParameterName))) } + } \ No newline at end of file diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaContextInitializer.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaContextInitializer.scala index e18de113ab6..71bd139a913 100644 --- a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaContextInitializer.scala +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaContextInitializer.scala @@ -41,7 +41,7 @@ class KafkaContextInitializer[K, V, DefinedParameter <: BaseDefinedParameter, St new BasicContextInitializingFunction[ConsumerRecord[K, V]](processId, taskName) { override def map(input: ConsumerRecord[K, V]): Context = { val headers: java.util.Map[String, String] = ConsumerRecordUtils.toMap(input.headers).asJava - val inputMeta = InputMeta(input.key, input.topic, input.partition, input.offset, input.timestamp, headers) + val inputMeta = InputMeta(input.key, input.topic, input.partition, input.offset, input.timestamp, input.timestampType(), headers, input.leaderEpoch().orElse(null)) newContext .withVariable(VariableConstants.InputVariableName, input.value) .withVariable(VariableConstants.InputMetaVariableName, inputMeta) diff --git a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala index f89c236a398..6c64baeb1a1 100644 --- a/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala +++ b/engine/flink/kafka-util/src/main/scala/pl/touk/nussknacker/engine/kafka/source/KafkaSourceFactory.scala @@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchema import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, RecordFormatter} import org.apache.flink.types.Nothing import org.apache.kafka.clients.consumer.ConsumerRecord +import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext} import pl.touk.nussknacker.engine.api.context.transformation.{NodeDependencyValue, SingleInputGenericNodeTransformation} import pl.touk.nussknacker.engine.api.definition._ @@ -19,8 +20,9 @@ import scala.reflect.ClassTag * Base factory for Kafka sources with additional metadata variable. * It is based on [[pl.touk.nussknacker.engine.api.context.transformation.SingleInputGenericNodeTransformation]] * that allows custom ValidationContext and Context transformations, which are provided by [[pl.touk.nussknacker.engine.kafka.source.KafkaContextInitializer]] - * - * Wrapper for [[org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer]] + * Can be used for single- or multi- topic sources (as csv, see topicNameSeparator and extractTopics). + * + * Wrapper for [[org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer]] * Features: * - fetch latest N records which can be later used to test process in UI * Fetching data is defined in [[pl.touk.nussknacker.engine.kafka.source.KafkaSource]] which @@ -40,26 +42,39 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](deserializationSchemaFactory: processObjectDependencies: ProcessObjectDependencies) extends FlinkSourceFactory[ConsumerRecord[K, V]] with SingleInputGenericNodeTransformation[FlinkSource[ConsumerRecord[K, V]]] { + protected val topicNameSeparator = "," + protected val customContextInitializer: KafkaContextInitializer[K, V, DefinedParameter, State] = new KafkaContextInitializer[K, V, DefinedParameter, State](Typed[K], Typed[V]) override type State = Nothing - override def initialParameters: List[Parameter] = Parameter[String](KafkaSourceFactory.TopicParamName) - .copy( - editor = Some(DualParameterEditor(simpleEditor = StringParameterEditor, defaultMode = DualEditorMode.RAW)), - validators = List(MandatoryParameterValidator, NotBlankParameterValidator) - ) :: Nil + // initialParameters should not expose raised exceptions. + override def initialParameters: List[Parameter] = + try { + prepareInitialParameters + } catch { + case e: Exception => Nil + } + // contextTransformation should handle exceptions raised by prepareInitialParameters override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(implicit nodeId: ProcessCompilationError.NodeId) : NodeTransformationDefinition = { - case TransformationStep(Nil, _) => NextParameters(initialParameters) + case step@TransformationStep(Nil, _) => { + try { + NextParameters(prepareInitialParameters) + } catch { + case e: Exception => + val finalErrors = List(CustomNodeError(e.getMessage, Some(KafkaSourceFactory.TopicParamName))) + FinalResults(customContextInitializer.validationContext(context, dependencies, step.parameters, step.state), finalErrors) + } + } case step@TransformationStep((KafkaSourceFactory.TopicParamName, _) :: Nil, None) => FinalResults(customContextInitializer.validationContext(context, dependencies, step.parameters, step.state)) } override def implementation(params: Map[String, Any], dependencies: List[NodeDependencyValue], finalState: Option[Nothing]): FlinkSource[ConsumerRecord[K, V]] = { - val topics = List(params(KafkaSourceFactory.TopicParamName).asInstanceOf[String]) + val topics = extractTopics(params) val preparedTopics = topics.map(KafkaUtils.prepareKafkaTopic(_, processObjectDependencies)) val kafkaConfig = KafkaConfig.parseProcessObjectDependencies(processObjectDependencies) KafkaUtils.validateTopicsExistence(preparedTopics, kafkaConfig) @@ -69,6 +84,24 @@ class KafkaSourceFactory[K: ClassTag, V: ClassTag](deserializationSchemaFactory: } } + /** + * Basic implementation of definition of single topic parameter. + * In case of fetching topics from external repository: return list of topics or raise exception. + */ + protected def prepareInitialParameters: List[Parameter] = Parameter[String](KafkaSourceFactory.TopicParamName) + .copy( + editor = Some(DualParameterEditor(simpleEditor = StringParameterEditor, defaultMode = DualEditorMode.RAW)), + validators = List(MandatoryParameterValidator, NotBlankParameterValidator) + ) :: Nil + + /** + * Extracts topics from default topic parameter. + */ + protected def extractTopics(params: Map[String, Any]): List[String] = { + val paramValue = params(KafkaSourceFactory.TopicParamName).asInstanceOf[String] + paramValue.split(topicNameSeparator).map(_.trim).toList + } + override def nodeDependencies: List[NodeDependency] = Nil } diff --git a/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/KafkaSourceFactorySpec.scala b/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/KafkaSourceFactorySpec.scala index 1442d9c6cdc..75466604675 100644 --- a/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/KafkaSourceFactorySpec.scala +++ b/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/KafkaSourceFactorySpec.scala @@ -1,5 +1,7 @@ package pl.touk.nussknacker.engine.kafka +import java.util.Optional + import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.record.TimestampType @@ -45,13 +47,14 @@ class KafkaSourceFactorySpec extends FunSuite with Matchers with KafkaSpec with 0, 0L, constTimestamp, - TimestampType.NO_TIMESTAMP_TYPE, + TimestampType.CREATE_TIME, ConsumerRecord.NULL_CHECKSUM.toLong, ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE, null, givenObj, - ConsumerRecordUtils.emptyHeaders + ConsumerRecordUtils.emptyHeaders, + Optional.of(0) ) pushMessage(new SimpleSerializationSchema[Any](topic, String.valueOf), givenObj, topic, timestamp = constTimestamp) val result = readLastMessage(StringSourceFactory, topic).head.asInstanceOf[ConsumerRecord[String, String]] @@ -66,9 +69,11 @@ class KafkaSourceFactorySpec extends FunSuite with Matchers with KafkaSpec with 0, 0L, constTimestamp, + TimestampType.CREATE_TIME, null, givenObj, - ConsumerRecordUtils.emptyHeaders + ConsumerRecordUtils.emptyHeaders, + Optional.of(0) ) pushMessage(new JsonSerializationSchema[SampleValue](topic).asInstanceOf[KafkaSerializationSchema[Any]], givenObj, topic, timestamp = constTimestamp) val result = readLastMessage(SampleEventSourceFactory, topic).head.asInstanceOf[ConsumerRecord[String, SampleValue]] @@ -83,9 +88,11 @@ class KafkaSourceFactorySpec extends FunSuite with Matchers with KafkaSpec with 0, 0L, constTimestamp, + TimestampType.CREATE_TIME, null, givenObj, - ConsumerRecordUtils.emptyHeaders + ConsumerRecordUtils.emptyHeaders, + Optional.of(0) ) pushMessage(new JsonSerializationSchema[SampleValue](topic).asInstanceOf[KafkaSerializationSchema[Any]], givenObj, topic, timestamp = constTimestamp) val result = readLastMessage(ConsumerRecordValueSourceFactory, topic).head.asInstanceOf[ConsumerRecord[String, SampleValue]] @@ -100,9 +107,11 @@ class KafkaSourceFactorySpec extends FunSuite with Matchers with KafkaSpec with 0, 0L, constTimestamp, + TimestampType.CREATE_TIME, sampleKey, sampleValue, - ConsumerRecordUtils.toHeaders(sampleHeaders) + ConsumerRecordUtils.toHeaders(sampleHeaders), + Optional.of(0) ) pushMessage(objToSerializeSerializationSchema(topic), givenObj, topic, timestamp = constTimestamp) val result = readLastMessage(ConsumerRecordKeyValueSourceFactory, topic).head.asInstanceOf[ConsumerRecord[SampleKey, SampleValue]] diff --git a/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSpec.scala b/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSpec.scala index b9c4b55af01..25048bf2a86 100644 --- a/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSpec.scala +++ b/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSpec.scala @@ -5,7 +5,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.typeutils.MapTypeInfo import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, ScalaCaseClassSerializer} import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} import org.apache.kafka.clients.consumer.ConsumerRecord @@ -50,28 +49,15 @@ class ConsumerRecordDeserializationSpec extends FunSuite with Matchers with Kafk override def createSerializer(config: ExecutionConfig): TypeSerializer[SampleKey] = new ScalaCaseClassSerializer[SampleKey](classOf[SampleKey], sampleKeyFieldTypes.map(_.createSerializer(config)).toArray) } - val fieldNames = List("key", "topic", "partition", "offset", "timestamp", "headers") - val fieldTypes = List( - sampleKeyTypeInformation, - TypeInformation.of(classOf[String]), - TypeInformation.of(classOf[Integer]), - TypeInformation.of(classOf[java.lang.Long]), - TypeInformation.of(classOf[java.lang.Long]), - new MapTypeInfo(classOf[String], classOf[String]) - ) - val typeInfo3 = new CaseClassTypeInfo[InputMeta[SampleKey]](classOf[InputMeta[SampleKey]], Array.empty, fieldTypes, fieldNames){ - override def createSerializer(config: ExecutionConfig): TypeSerializer[InputMeta[SampleKey]] = - new ScalaCaseClassSerializer[InputMeta[SampleKey]](classOf[InputMeta[SampleKey]], fieldTypes.map(_.createSerializer(config)).toArray) - } - val givenObj = InputMeta[SampleKey](SampleKey("one", 2), "dummy", 3, 4L, 5L, Map("one" -> "header value", "two" -> null).asJava) + val typeInformation = InputMeta.typeInformation[SampleKey](sampleKeyTypeInformation) + val givenObj = InputMeta[SampleKey](SampleKey("one", 2), "dummy", 3, 4L, 5L, TimestampType.CREATE_TIME, Map("one" -> "header value", "two" -> null).asJava, 6) - serializeRoundTrip(givenObj, typeInfo3, executionConfigWithoutKryo)() - serializeRoundTrip(givenObj, typeInfo3, executionConfigWithKryo)() + serializeRoundTrip(givenObj, typeInformation, executionConfigWithoutKryo)() + serializeRoundTrip(givenObj, typeInformation, executionConfigWithKryo)() } - private def serializeRoundTrip[T](record: T, typeInfo: TypeInformation[T], executionConfig: ExecutionConfig - = executionConfigWithoutKryo)(expected:T = record): T = { + private def serializeRoundTrip[T](record: T, typeInfo: TypeInformation[T], executionConfig: ExecutionConfig = executionConfigWithoutKryo)(expected:T = record): T = { val serializer = typeInfo.createSerializer(executionConfig) serializeRoundTripWithSerializers(record, serializer, serializer)(expected) } diff --git a/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatterSpec.scala b/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatterSpec.scala index 2fb894dd652..9d391bc6ec2 100644 --- a/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatterSpec.scala +++ b/engine/flink/kafka-util/src/test/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordToJsonFormatterSpec.scala @@ -1,12 +1,16 @@ package pl.touk.nussknacker.engine.kafka.consumerrecord +import java.util.Optional + import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.record.TimestampType import org.scalatest.{Assertion, BeforeAndAfterAll, FunSuite, Matchers} import pl.touk.nussknacker.engine.kafka._ import pl.touk.nussknacker.engine.kafka.SampleConsumerRecordSerializationSchemaFactory import pl.touk.nussknacker.engine.kafka.KafkaSourceFactoryMixin._ -class ConsumerRecordToJsonFormatterSpec extends FunSuite with Matchers with KafkaSpec with BeforeAndAfterAll { +class ConsumerRecordToJsonFormatterSpec extends FunSuite with Matchers with KafkaSpec with BeforeAndAfterAll { private val topic = "dummyTopic" private lazy val kafkaConfig = KafkaConfig.parseConfig(config) @@ -22,7 +26,7 @@ class ConsumerRecordToJsonFormatterSpec extends FunSuite with Matchers with Kafk private val sampleKey = SampleKey("one", 2) private val sampleValue = SampleValue("lorem", "ipsum") - private val sampleHeaders = ConsumerRecordUtils.toHeaders(Map("first" -> "not empty", "second" -> null)) + private val sampleHeaders: Headers = ConsumerRecordUtils.toHeaders(Map("first" -> "not empty", "second" -> null)) test("check sample serializer and deserializer") { val resultKeyBytes = sampleKeyJsonSerializer.serialize(topic, sampleKey) @@ -36,7 +40,7 @@ class ConsumerRecordToJsonFormatterSpec extends FunSuite with Matchers with Kafk test("prepare and parse test data from ConsumerRecord with key, with headers") { val sampleKeyBytes = sampleKeyJsonSerializer.serialize(topic, sampleKey) val sampleValueBytes = sampleValueJsonSerializer.serialize(topic, sampleValue) - val givenObj = SerializableConsumerRecord.createConsumerRecord(topic, 11, 22L,100L, sampleKeyBytes, sampleValueBytes, sampleHeaders) + val givenObj = SerializableConsumerRecord.createConsumerRecord(topic, 11, 22L,100L, TimestampType.NO_TIMESTAMP_TYPE, sampleKeyBytes, sampleValueBytes, sampleHeaders, Optional.empty[Integer]) val resultBytes = sampleKeyValueFormatter.prepareGeneratedTestData(List(givenObj)) val resultObj = sampleKeyValueFormatter.parseDataForTest(topic, resultBytes).head checkResult(resultObj, givenObj) diff --git a/engine/flink/management/sample/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala b/engine/flink/management/sample/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala index 92796c343d1..2e761436808 100644 --- a/engine/flink/management/sample/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala +++ b/engine/flink/management/sample/src/main/scala/pl/touk/nussknacker/engine/management/sample/DevProcessConfigCreator.scala @@ -1,6 +1,7 @@ package pl.touk.nussknacker.engine.management.sample import java.time.LocalDateTime + import com.typesafe.config.Config import io.circe.Encoder import org.apache.flink.api.common.serialization.SimpleStringSchema @@ -20,7 +21,6 @@ import pl.touk.nussknacker.engine.flink.util.transformer.aggregate.sampleTransfo import pl.touk.nussknacker.engine.flink.util.transformer.outer.OuterJoinTransformer import pl.touk.nussknacker.engine.flink.util.transformer.{TransformStateTransformer, UnionTransformer, UnionWithMemoTransformer} import pl.touk.nussknacker.engine.kafka.{BasicFormatter, KafkaConfig} -import pl.touk.nussknacker.engine.kafka.generic.sources.EspValueDeserializaitionSchemaFactory import pl.touk.nussknacker.engine.kafka.serialization.schemas.SimpleSerializationSchema import pl.touk.nussknacker.engine.kafka.sink.KafkaSinkFactory import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory @@ -34,6 +34,7 @@ import pl.touk.nussknacker.engine.management.sample.source._ import pl.touk.nussknacker.engine.management.sample.transformer._ import pl.touk.nussknacker.engine.util.LoggingListener import net.ceedubs.ficus.Ficus._ +import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory object DevProcessConfigCreator { val oneElementValue = "One element" @@ -75,7 +76,7 @@ class DevProcessConfigCreator extends ProcessConfigCreator { override def listeners(processObjectDependencies: ProcessObjectDependencies) = List(LoggingListener) override def sourceFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SourceFactory[_]]] = Map( - "real-kafka" -> all(new KafkaSourceFactory[String, String](new EspValueDeserializaitionSchemaFactory(new SimpleStringSchema), + "real-kafka" -> all(new KafkaSourceFactory[String, String](new FixedValueDeserializaitionSchemaFactory(new SimpleStringSchema), None, BasicFormatter, processObjectDependencies)), diff --git a/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala b/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala index eeda9f00368..20e7e590f73 100644 --- a/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala +++ b/engine/flink/process/src/main/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetection.scala @@ -8,7 +8,7 @@ import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, OptionTypeInfo, import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.typed.TypedMap import pl.touk.nussknacker.engine.api.typed.typing._ -import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection +import pl.touk.nussknacker.engine.flink.api.typeinformation.{TypingResultAwareTypeInformationCustomisation, TypeInformationDetectionForTypingResult} import pl.touk.nussknacker.engine.api.{Context, InterpretationResult, PartReference, ValueWithContext} import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.{TypedJavaMapTypeInformation, TypedMapTypeInformation, TypedScalaMapTypeInformation} import pl.touk.nussknacker.engine.process.typeinformation.internal.{FixedValueSerializers, InterpretationResultMapTypeInfo} @@ -25,7 +25,7 @@ object TypingResultAwareTypeInformationDetection { } class CompositeCustomisation(customisations: List[TypingResultAwareTypeInformationCustomisation]) extends TypingResultAwareTypeInformationCustomisation { - override def customise(originalDetection: TypingResultAwareTypeInformationDetection): PartialFunction[TypingResult, TypeInformation[_]] = + override def customise(originalDetection: TypeInformationDetectionForTypingResult): PartialFunction[TypingResult, TypeInformation[_]] = customisations.map(_.customise(originalDetection)).reduceOption(_.orElse(_)).getOrElse(Map.empty) } @@ -40,7 +40,7 @@ object TypingResultAwareTypeInformationDetection { To use it for state serialization one can use it directly in operators/process functions (compatibility is *NOT* guaranteed ATM). */ class TypingResultAwareTypeInformationDetection(customisation: - TypingResultAwareTypeInformationCustomisation) extends TypeInformationDetection { + TypingResultAwareTypeInformationCustomisation) extends TypeInformationDetectionForTypingResult { private val additionalTypeInfoDeterminer = customisation.customise(this) diff --git a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala index 7dd1ca1767c..8de79b369b1 100644 --- a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala +++ b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/helpers/SampleNodes.scala @@ -37,7 +37,7 @@ import pl.touk.nussknacker.engine.flink.test.RecordingExceptionHandler import pl.touk.nussknacker.engine.flink.util.service.TimeMeasuringService import pl.touk.nussknacker.engine.flink.util.signal.KafkaSignalStreamConnector import pl.touk.nussknacker.engine.flink.util.source.{CollectionSource, EspDeserializationSchema} -import pl.touk.nussknacker.engine.kafka.generic.sources.EspValueDeserializaitionSchemaFactory +import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory import pl.touk.nussknacker.engine.kafka.{BasicFormatter, KafkaConfig, KafkaUtils} import pl.touk.nussknacker.engine.process.SimpleJavaEnum @@ -844,7 +844,7 @@ object SampleNodes { @JsonCodec case class KeyValue(key: String, value: Int, date: Long) class KeyValueKafkaSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, KeyValue]( - new EspValueDeserializaitionSchemaFactory(new EspDeserializationSchema[KeyValue](e => CirceUtil.decodeJsonUnsafe[KeyValue](e))), + new FixedValueDeserializaitionSchemaFactory(new EspDeserializationSchema[KeyValue](e => CirceUtil.decodeJsonUnsafe[KeyValue](e))), Some(outOfOrdernessTimestampExtractor[ConsumerRecord[String, KeyValue]](_.value().date)), BasicFormatter, processObjectDependencies) diff --git a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryIntegrationSpec.scala b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryIntegrationSpec.scala index 2a19e60d037..0c03a4d9f96 100644 --- a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryIntegrationSpec.scala +++ b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryIntegrationSpec.scala @@ -1,5 +1,6 @@ package pl.touk.nussknacker.engine.process.source +import org.apache.kafka.common.record.TimestampType import pl.touk.nussknacker.engine.kafka.source.InputMeta import pl.touk.nussknacker.engine.kafka.KafkaSourceFactoryMixin.{ObjToSerialize, SampleKey, SampleValue} import pl.touk.nussknacker.engine.process.source.KafkaSourceFactoryProcessConfigCreator.SinkForInputMeta @@ -72,9 +73,36 @@ class KafkaSourceFactoryIntegrationSpec extends KafkaSourceFactoryProcessMixin pushMessage(objToSerializeSerializationSchema(topic), givenObj, topic, timestamp = constTimestamp) run(process) { eventually { - SinkForInputMeta.data shouldBe List(InputMeta("""{"partOne":"some key","partTwo":123}""", topic, 0, 0L, constTimestamp, givenObj.headers.asJava)) + SinkForInputMeta.data shouldBe List(InputMeta("""{"partOne":"some key","partTwo":123}""", topic, 0, 0L, constTimestamp, TimestampType.CREATE_TIME, givenObj.headers.asJava, 0)) } } } + test("source with two input topics") { + val topicOne = "kafka-multitopic-one" + val topicTwo = "kafka-multitopic-two" + val topic = s"$topicOne, $topicTwo" + val givenObj = ObjToSerialize(TestSampleValue, TestSampleKey, TestSampleHeaders) + val process = createProcess(topic, SourceType.jsonValueWithMeta) + createTopic(topicOne) + createTopic(topicTwo) + pushMessage(objToSerializeSerializationSchema(topicOne), givenObj, topicOne, timestamp = constTimestamp) + pushMessage(objToSerializeSerializationSchema(topicTwo), givenObj, topicTwo, timestamp = constTimestamp) + run(process) { + eventually { + SinkForInputMeta.data.map(_.topic).toSet shouldEqual Set(topicOne, topicTwo) + } + } + } + + test("source with exception within prepareInitialParameters") { + val topic = "kafka-source-with-exception" + val givenObj = ObjToSerialize(TestSampleValue, TestSampleKey, TestSampleHeaders) + val process = createProcess(topic, SourceType.jsonValueWithMetaWithException) + + intercept[Exception] { + runAndVerifyResult(topic, process, givenObj) + }.getMessage should include ("Checking scenario: fetch topics from external source") + } + } diff --git a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessConfigCreator.scala b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessConfigCreator.scala index bebb82fea8d..01a40bdecf5 100644 --- a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessConfigCreator.scala +++ b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessConfigCreator.scala @@ -3,6 +3,7 @@ package pl.touk.nussknacker.engine.process.source import io.circe.{Decoder, Encoder} import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import pl.touk.nussknacker.engine.api.definition.Parameter import pl.touk.nussknacker.engine.api.exception.ExceptionHandlerFactory import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, SinkFactory, SourceFactory, WithCategories} import pl.touk.nussknacker.engine.flink.api.process.BasicFlinkSink @@ -24,7 +25,8 @@ class KafkaSourceFactoryProcessConfigCreator(kafkaConfig: KafkaConfig) extends E override def sourceFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SourceFactory[_]]] = { Map( "kafka-jsonKeyJsonValueWithMeta" -> defaultCategory(KafkaConsumerRecordSourceHelper.jsonKeyValueWithMeta[SampleKey, SampleValue](processObjectDependencies, kafkaConfig)), - "kafka-jsonValueWithMeta" -> defaultCategory(KafkaConsumerRecordSourceHelper.jsonValueWithMeta[SampleValue](processObjectDependencies, kafkaConfig)) + "kafka-jsonValueWithMeta" -> defaultCategory(KafkaConsumerRecordSourceHelper.jsonValueWithMeta[SampleValue](processObjectDependencies, kafkaConfig)), + "kafka-jsonValueWithMeta-withException" -> defaultCategory(KafkaConsumerRecordSourceHelper.jsonValueWithMetaWithException[SampleValue](processObjectDependencies, kafkaConfig)) ) } @@ -102,5 +104,26 @@ object KafkaSourceFactoryProcessConfigCreator { ) kafkaSource.asInstanceOf[KafkaSourceFactory[Any, Any]] } + + // For scenario when prepareInitialParameters fetches list of available topics form some external repository and an exception occurs. + def jsonValueWithMetaWithException[V: ClassTag:Encoder:Decoder](processObjectDependencies: ProcessObjectDependencies, kafkaConfig: KafkaConfig): KafkaSourceFactory[Any, Any] = { + val deserializationSchemaFactory = new SampleConsumerRecordDeserializationSchemaFactory(new StringDeserializer with Serializable, createDeserializer[V]) + val serializationSchemaFactory = new SampleConsumerRecordSerializationSchemaFactory(new StringSerializer with Serializable, createSerializer[V]) + val testDataRecordFormatter = new ConsumerRecordToJsonFormatter( + deserializationSchemaFactory.create(List("dummyTopic"), kafkaConfig), + serializationSchemaFactory.create("dummyTopic", kafkaConfig) + ) + val kafkaSource = new KafkaSourceFactory( + deserializationSchemaFactory, + None, + testDataRecordFormatter, + processObjectDependencies + ) { + override protected def prepareInitialParameters: List[Parameter] = { + throw new IllegalArgumentException("Checking scenario: fetch topics from external source") + } + } + kafkaSource.asInstanceOf[KafkaSourceFactory[Any, Any]] + } } } diff --git a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessMixin.scala b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessMixin.scala index 5f7208005b5..467194b23cc 100644 --- a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessMixin.scala +++ b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/source/KafkaSourceFactoryProcessMixin.scala @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.process.source import org.apache.flink.runtime.execution.ExecutionState import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.kafka.common.record.TimestampType import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} import pl.touk.nussknacker.engine.api.ProcessVersion import pl.touk.nussknacker.engine.api.deployment.DeploymentData @@ -31,7 +32,7 @@ trait KafkaSourceFactoryProcessMixin extends FunSuite with Matchers with KafkaSo protected override def beforeAll(): Unit = { super.beforeAll() val modelData = LocalModelData(config, creator) - registrar = FlinkProcessRegistrar(new FlinkProcessCompiler(modelData), config, ExecutionConfigPreparer.unOptimizedChain(modelData)) + registrar = FlinkProcessRegistrar(new FlinkProcessCompiler(modelData), ExecutionConfigPreparer.unOptimizedChain(modelData)) } before { @@ -68,7 +69,7 @@ trait KafkaSourceFactoryProcessMixin extends FunSuite with Matchers with KafkaSo pushMessage(objToSerializeSerializationSchema(topic), obj, topic, timestamp = constTimestamp) run(process) { eventually { - SinkForInputMeta.data shouldBe List(InputMeta(obj.key, topic, 0, 0L, constTimestamp, obj.headers.asJava)) + SinkForInputMeta.data shouldBe List(InputMeta(obj.key, topic, 0, 0L, constTimestamp, TimestampType.CREATE_TIME, obj.headers.asJava, 0)) SinkForSampleValue.data shouldBe List(obj.value) recordingExceptionHandler.data should have size 0 } @@ -80,15 +81,24 @@ trait KafkaSourceFactoryProcessMixin extends FunSuite with Matchers with KafkaSo type SourceType = Value val jsonKeyJsonValueWithMeta: SourceType.Value = Value("kafka-jsonKeyJsonValueWithMeta") val jsonValueWithMeta: SourceType.Value = Value("kafka-jsonValueWithMeta") + val jsonValueWithMetaWithException: SourceType.Value = Value("kafka-jsonValueWithMeta-withException") } protected def createProcess(topic: String, sourceType: SourceType.Value, customVariables: Map[String, String] = Map.empty): EspProcess = { //should check and recognize all variables based on #input and #inputMeta val inputVariables = Map("id" ->" #input.id", "field" -> "#input.field") - val metaVariables = Map("topic" -> "#inputMeta.topic", "partition" -> "#inputMeta.partition", "offset" -> "#inputMeta.offset", "timestamp" -> "#inputMeta.timestamp") + val metaVariables = Map( + "topic" -> "#inputMeta.topic", + "partition" -> "#inputMeta.partition", + "offset" -> "#inputMeta.offset", + "timestamp" -> "#inputMeta.timestamp", + "timestampType" -> "#inputMeta.timestampType.name", + "leaderEpoch" -> "#inputMeta.leaderEpoch" + ) val keyVariables = sourceType match { case SourceType.jsonKeyJsonValueWithMeta => Map("key1" -> "#inputMeta.key.partOne", "key2" -> "#inputMeta.key.partTwo") case SourceType.jsonValueWithMeta => Map("key" -> "#inputMeta.key") + case _ => Map.empty[String, String] } val headerVariables = Map("headers" -> "#inputMeta.headers.toString()") val checkAllVariables = inputVariables ++ metaVariables ++ keyVariables ++ headerVariables ++ customVariables diff --git a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypeInformationDetectionSpec.scala b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypeInformationDetectionSpec.scala index ebd6ab2f2d6..1ed1a9553e7 100644 --- a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypeInformationDetectionSpec.scala +++ b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypeInformationDetectionSpec.scala @@ -8,7 +8,7 @@ import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.deployment.DeploymentData import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown} -import pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection +import pl.touk.nussknacker.engine.flink.api.typeinformation.{TypeInformationDetection, TypingResultAwareTypeInformationCustomisation, TypeInformationDetectionForTypingResult} import pl.touk.nussknacker.engine.api.{Context, InterpretationResult, ProcessVersion, ValueWithContext} import pl.touk.nussknacker.engine.flink.api.{ConfigGlobalParameters, DefaultAdditionalInformationSerializer, NkGlobalParameters} import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.TypedScalaMapTypeInformation @@ -70,7 +70,7 @@ class TypeInformationDetectionSpec extends FunSuite with Matchers { class CustomTypeInformationCustomisation extends TypingResultAwareTypeInformationCustomisation { - override def customise(originalDetection: TypingResultAwareTypeInformationDetection): PartialFunction[typing.TypingResult, TypeInformation[_]] = { + override def customise(originalDetection: TypeInformationDetectionForTypingResult): PartialFunction[typing.TypingResult, TypeInformation[_]] = { case Unknown => new NothingTypeInfo } } diff --git a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala index b656bbe6af0..ee116b34c69 100644 --- a/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala +++ b/engine/flink/process/src/test/scala/pl/touk/nussknacker/engine/process/typeinformation/TypingResultAwareTypeInformationDetectionSpec.scala @@ -2,9 +2,9 @@ package pl.touk.nussknacker.engine.process.typeinformation import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Kryo, Serializer} - import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.util.Collections + import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.base.{IntSerializer, LongSerializer, StringSerializer} @@ -18,6 +18,7 @@ import pl.touk.nussknacker.engine.api.context.ValidationContext import pl.touk.nussknacker.engine.api.typed.typing import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedObjectTypingResult} import pl.touk.nussknacker.engine.api.{Context, ValueWithContext} +import pl.touk.nussknacker.engine.flink.api.typeinformation.{TypingResultAwareTypeInformationCustomisation, TypeInformationDetectionForTypingResult} import pl.touk.nussknacker.engine.process.typeinformation.internal.typedobject.{BaseJavaMapBasedSerializer, TypedObjectBasedSerializerSnapshot, TypedObjectBasedTypeInformation, TypedObjectBasedTypeSerializer, TypedScalaMapSerializer} import pl.touk.nussknacker.engine.process.typeinformation.testTypedObject.{CustomObjectTypeInformation, CustomTypedObject} import pl.touk.nussknacker.engine.util.Implicits._ @@ -28,7 +29,7 @@ import scala.collection.immutable.ListMap class TypingResultAwareTypeInformationDetectionSpec extends FunSuite with Matchers { private val informationDetection = new TypingResultAwareTypeInformationDetection(new TypingResultAwareTypeInformationCustomisation { - override def customise(originalDetection: TypingResultAwareTypeInformationDetection): PartialFunction[typing.TypingResult, TypeInformation[_]] = { + override def customise(originalDetection: TypeInformationDetectionForTypingResult): PartialFunction[typing.TypingResult, TypeInformation[_]] = { case e: TypedObjectTypingResult if e.objType == Typed.typedClass[CustomTypedObject] => CustomObjectTypeInformation(e.fields.mapValuesNow(originalDetection.forType)) }