Skip to content

Commit

Permalink
CR fixes part 2, TypeInformation for InputMeta
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz committed May 11, 2021
1 parent 3e8c213 commit 787efed
Show file tree
Hide file tree
Showing 23 changed files with 333 additions and 111 deletions.
15 changes: 10 additions & 5 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ To see biggest differences please consult the [changelog](Changelog.md).
* [#1557](https://github.com/TouK/nussknacker/pull/1556) Some classes from standalone engine were moved to standalone api to remove engine to (model) utils dependency:
`StandaloneContext`, `StandaloneContextLifecycle`, `MetricsProvider`
* [#1558](https://github.com/TouK/nussknacker/pull/1558) `FlinkProcessRegistrar` takes configuration directly from `FlinkProcessCompiler` (this can affect some tests setup)
* [#1512](https://github.com/TouK/nussknacker/pull/1512) New kafka source `KafkaGenericNodeSourceFactory`, which is based on `GenericNodeTransformation`, gives access to setup of `ValidationContext` and `Context` initialization.
It uses `KafkaGenericContextInitializer` to initialize `Context` with additional variable with kafka event metadata. Factory requires proper deserialization to `ConsumerRecord` (see `ConsumerRecordDeserializationSchemaFactory`).
To replace basic `KafkaSourceFactory` with `KafkaGenericNodeSourceFactory`:
- use `ConsumerRecordDeserializationSchemaFactory` with current `DeserializationSchema` as a value deserializer, add key deserializer
* [#1512](https://github.com/TouK/nussknacker/pull/1512) Replaced `KafkaSourceFactory` with source based on `GenericNodeTransformation`, which gives access to setup of `ValidationContext` and `Context` initialization.
It uses `KafkaContextInitializer` to initialize `Context` with additional variable containing kafka event metadata. Source factory requires deserialization to `ConsumerRecord` (see `ConsumerRecordDeserializationSchemaFactory`).
To migrate `KafkaSourceFactory`:
- provide deserializer factory:
- use `ConsumerRecordDeserializationSchemaFactory` with current `DeserializationSchema` as a value deserializer, add key deserializer
- or use `FixedValueDeserializaitionSchemaFactory` with simple key-as-string deserializer
- current `RecordFormater` should be sufficient for value-only serialization, or use `ConsumerRecordToJsonFormatter` for metadata serialization
- implement timestampAssigner that is able to extract time from `ConsumerRecord[K, V]`
- provide timestampAssigner that is able to extract time from `ConsumerRecord[K, V]`
Also:
- removed `BaseKafkaSourceFactory` with multiple topics support: use `KafkaSourceFactory` instead, see test "source with two input topics"
- removed `SingleTopicKafkaSourceFactory`: use `KafkaSourceFactory` with custom `prepareInitialParameters`, `contextTransformation` and `extractTopics` to alter parameter list and provide constant topic value.

## In version 0.3.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler;
import pl.touk.nussknacker.engine.javaapi.process.ExpressionConfig;
import pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator;
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory;
import pl.touk.nussknacker.engine.kafka.generic.sources;
import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchemaFactory;
import pl.touk.nussknacker.engine.kafka.serialization.schemas;
Expand Down Expand Up @@ -86,7 +87,7 @@ public TypeInformation<Transaction> getProducedType() {
}
};
return new KafkaSourceFactory<>(
new sources.EspValueDeserializaitionSchemaFactory<>(schema),
new FixedValueDeserializaitionSchemaFactory<>(schema),
Option.apply(extractor),
sources.JsonRecordFormatter$.MODULE$,
processObjectDependencies,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.touk.nussknacker.engine.demo

import java.time.Duration

import com.typesafe.config.Config
import io.circe.Json
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -19,7 +20,8 @@ import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestam
import pl.touk.nussknacker.engine.flink.util.exception.BrieflyLoggingExceptionHandler
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.flink.util.transformer.{TransformStateTransformer, UnionTransformer}
import pl.touk.nussknacker.engine.kafka.generic.sources.{EspValueDeserializaitionSchemaFactory, JsonRecordFormatter}
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory
import pl.touk.nussknacker.engine.kafka.generic.sources.JsonRecordFormatter
import pl.touk.nussknacker.engine.kafka.serialization.schemas.SimpleSerializationSchema
import pl.touk.nussknacker.engine.kafka.sink.KafkaSinkFactory
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
Expand Down Expand Up @@ -71,7 +73,7 @@ class DemoProcessConfigCreator extends ProcessConfigCreator {
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, T]]],
processObjectDependencies: ProcessObjectDependencies): FlinkSourceFactory[ConsumerRecord[String, T]] = {
val schema = new EspDeserializationSchema[T](bytes => decode(bytes))
val schemaFactory = new EspValueDeserializaitionSchemaFactory(schema)
val schemaFactory = new FixedValueDeserializaitionSchemaFactory(schema)
new KafkaSourceFactory[String, T](schemaFactory, timestampAssigner, JsonRecordFormatter, processObjectDependencies)(classTag[String], ClassTag(implicitly[TypeInformation[T]].getTypeClass))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,12 @@ trait TypeInformationDetection extends Serializable {
def forValueWithContext[T](validationContext: ValidationContext, value: TypingResult): TypeInformation[ValueWithContext[T]]

}

/**
* Trait that allows for providing more details TypeInformation when TypingResult is known.
*/
trait TypeInformationDetectionForTypingResult extends TypeInformationDetection {

def forType(typingResult: TypingResult): TypeInformation[Any]

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.touk.nussknacker.engine.process.typeinformation
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
Expand All @@ -9,6 +9,6 @@ import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
*/
trait TypingResultAwareTypeInformationCustomisation {

def customise(originalDetection: TypingResultAwareTypeInformationDetection): PartialFunction[TypingResult, TypeInformation[_]]
def customise(originalDetection: TypeInformationDetectionForTypingResult): PartialFunction[TypingResult, TypeInformation[_]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ abstract class ConsumerRecordDeserializationSchemaFactory[K, V] extends KafkaDes
record.serializedValueSize(),
key,
value,
record.headers()
record.headers(),
record.leaderEpoch()
)
}

override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false

// TODO: Provide better way to calculate TypeInformation. Here in case of serialization (of generic type) Kryo is used.
// It is assumed that while this ConsumerRecord[K, V] object lifespan is short, inside of source, this iplementation
// is sufficient.
override def getProducedType: TypeInformation[ConsumerRecord[K, V]] = {
val clazz = classTag[ConsumerRecord[K, V]].runtimeClass.asInstanceOf[Class[ConsumerRecord[K, V]]]
TypeInformation.of(clazz)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord

import java.nio.charset.StandardCharsets
import java.util.Optional

import com.github.ghik.silencer.silent
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema}
Expand All @@ -14,16 +14,20 @@ import pl.touk.nussknacker.engine.api.test.{TestDataSplit, TestParsingUtils}
import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, RecordFormatter}
import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord._

import scala.annotation.nowarn

@silent("deprecated")
@nowarn("cat=deprecation")
/**
* RecordFormatter used to encode and decode whole raw kafka event (ConsumerRecord) in json format.
*
* @param deserializationSchema - schema used to convert raw kafka event to serializable representation (see SerializableConsumerRecord)
* @param serializationSchema - schema used to convert serializable representation to raw kafka event
* @tparam K - event key type
* @tparam V - event value type
*/
class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](deserializationSchema: KafkaDeserializationSchema[ConsumerRecord[K, V]],
serializationSchema: KafkaSerializationSchema[ConsumerRecord[K, V]])
extends RecordFormatter {

implicit val consumerRecordDecoder: Decoder[SerializableConsumerRecord[K, V]] = deriveDecoder
implicit val consumerRecordEncoder: Encoder[SerializableConsumerRecord[K, V]] = deriveEncoder
protected val consumerRecordDecoder: Decoder[SerializableConsumerRecord[K, V]] = deriveDecoder
protected val consumerRecordEncoder: Encoder[SerializableConsumerRecord[K, V]] = deriveEncoder

override protected def formatRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): Array[Byte] = {
val deserializedRecord = deserializationSchema.deserialize(record)
Expand All @@ -34,14 +38,16 @@ class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](dese
Option(deserializedRecord.partition()),
Option(deserializedRecord.offset()),
Option(deserializedRecord.timestamp()),
Option(ConsumerRecordUtils.toMap(deserializedRecord.headers()).mapValues(s => Option(s)))
Option(deserializedRecord.timestampType().name),
Option(ConsumerRecordUtils.toMap(deserializedRecord.headers()).mapValues(s => Option(s))),
Option(deserializedRecord.leaderEpoch().orElse(null)).map(_.intValue()) //avoids covert null -> 0 conversion
)
implicitly[Encoder[SerializableConsumerRecord[K, V]]].apply(serializableRecord).noSpaces.getBytes(StandardCharsets.UTF_8)
consumerRecordEncoder(serializableRecord).noSpaces.getBytes(StandardCharsets.UTF_8)
}

override protected def parseRecord(topic: String, bytes: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = {
val serializableRecord = CirceUtil.decodeJsonUnsafe[SerializableConsumerRecord[K, V]](bytes) // decode json in SerializableConsumerRecord[K, V] domain
val serializableConsumerRecord = SerializableConsumerRecord.from(topic, serializableRecord) // update with defaults if fields are missing in json
val serializableRecord = CirceUtil.decodeJsonUnsafe(bytes)(consumerRecordDecoder) // decode json in SerializableConsumerRecord[K, V] domain
val serializableConsumerRecord = toConsumerRecord(topic, serializableRecord) // update with defaults if fields are missing in json
// Here serialization schema and ProducerRecord are used to transform key and value to proper Array[Byte].
// Other properties are ignored by serializer and are based on values provided by decoded json (or default empty values).
val producerRecord = serializationSchema.serialize(serializableConsumerRecord, serializableConsumerRecord.timestamp()) // serialize K and V to Array[Byte]
Expand All @@ -50,38 +56,62 @@ class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](dese
serializableConsumerRecord.partition,
serializableConsumerRecord.offset,
serializableConsumerRecord.timestamp,
serializableConsumerRecord.timestampType(),
producerRecord.key(),
producerRecord.value(),
producerRecord.headers()
producerRecord.headers(),
serializableConsumerRecord.leaderEpoch()
)
}

override protected def testDataSplit: TestDataSplit = TestParsingUtils.newLineSplit

}

/**
* Wrapper for ConsumerRecord fields used for test data serialization, eg. json serialization.
* All fields apart from value are optional.
*/
case class SerializableConsumerRecord[K, V](key: Option[K],
value: V,
topic: Option[String],
partition: Option[Int],
offset: Option[Long],
timestamp: Option[Long],
timestampType: Option[String],
headers: Option[Map[String, Option[String]]],
leaderEpoch: Option[Int]) {

case class SerializableConsumerRecord[K, V](key: Option[K], value: V, topic: Option[String], partition: Option[Int], offset: Option[Long], timestamp: Option[Long], headers: Option[Map[String, Option[String]]])
}

object SerializableConsumerRecord {

def createConsumerRecord[K, V](topic: String, partition: Int, offset: Long, timestamp: Long, key: K, value: V, headers: Headers): ConsumerRecord[K, V] = {
new ConsumerRecord(topic, partition, offset, timestamp,
TimestampType.NO_TIMESTAMP_TYPE, ConsumerRecord.NULL_CHECKSUM.longValue(),
ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE,
key, value, headers
/**
* Creates ConsumerRecord with default: checksum, serializedKeySize and serializedValueSize.
*/
def createConsumerRecord[K, V](topic: String, partition: Int, offset: Long, timestamp: Long, timestampType: TimestampType, key: K, value: V, headers: Headers, leaderEpoch: Optional[Integer]): ConsumerRecord[K, V] = {
new ConsumerRecord(topic, partition, offset,
timestamp, timestampType,
ConsumerRecord.NULL_CHECKSUM.longValue(), ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE,
key, value, headers,
leaderEpoch
)
}

def from[K, V](topic: String, record: SerializableConsumerRecord[K, V]): ConsumerRecord[K, V] = {
/**
* Converts SerializableConsumerRecord to ConsumerRecord, uses default values in case of missing values.
*/
def toConsumerRecord[K, V](topic: String, record: SerializableConsumerRecord[K, V]): ConsumerRecord[K, V] = {
createConsumerRecord(
record.topic.getOrElse(topic),
record.partition.getOrElse(0),
record.offset.getOrElse(0L),
record.timestamp.getOrElse(ConsumerRecord.NO_TIMESTAMP),
record.timestampType.map(TimestampType.forName).getOrElse(TimestampType.NO_TIMESTAMP_TYPE),
record.key.getOrElse(null.asInstanceOf[K]),
record.value,
ConsumerRecordUtils.toHeaders(record.headers.map(_.mapValues(_.orNull)).getOrElse(Map.empty))
ConsumerRecordUtils.toHeaders(record.headers.map(_.mapValues(_.orNull)).getOrElse(Map.empty)),
Optional.ofNullable(record.leaderEpoch.map(Integer.valueOf).orNull) //avoids covert null -> 0 conversion
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord

import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import pl.touk.nussknacker.engine.kafka.KafkaConfig

/**
* Wrapper for value-only DeserializationSchema. For kafka event key data it uses simple "Array[Byte] to String" deserialization.
* Used with simple, value-only, sources where event key is empty or ignored.
*
* @param valueSchema - value deserialization schema (e.g. EspDeserializationSchema)
* @tparam V - type of value of deserialized ConsumerRecord
*/
class FixedValueDeserializaitionSchemaFactory[V](valueSchema: DeserializationSchema[V]) extends ConsumerRecordDeserializationSchemaFactory[String, V]{
override protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[String] = new StringDeserializer
override protected def createValueDeserializer(kafkaConfig: KafkaConfig): Deserializer[V] = new Deserializer[V] {
override def deserialize(topic: String, data: Array[Byte]): V = valueSchema.deserialize(data)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package pl.touk.nussknacker.engine.kafka.generic
import java.nio.charset.StandardCharsets
import java.util
import java.util.Collections

import io.circe.{Decoder, Json, JsonObject}
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, Source, TestDataGenerator}
import pl.touk.nussknacker.engine.api.test.{TestDataSplit, TestParsingUtils}
import pl.touk.nussknacker.engine.api.typed._
import pl.touk.nussknacker.engine.api.{CirceUtil, MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.kafka.consumerrecord.ConsumerRecordDeserializationSchemaFactory
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory
import pl.touk.nussknacker.engine.kafka.source.{KafkaSource, KafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.{BasicFormatter, KafkaConfig, KafkaUtils}
import pl.touk.nussknacker.engine.util.Implicits._
Expand All @@ -27,15 +26,8 @@ object sources {

import collection.JavaConverters._

class EspValueDeserializaitionSchemaFactory[T](schema: DeserializationSchema[T]) extends ConsumerRecordDeserializationSchemaFactory[String, T]{
override protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[String] = new StringDeserializer
override protected def createValueDeserializer(kafkaConfig: KafkaConfig): Deserializer[T] = new Deserializer[T] {
override def deserialize(topic: String, data: Array[Byte]): T = schema.deserialize(data)
}
}

class GenericJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies) extends KafkaSourceFactory[String, java.util.Map[_, _]](
new EspValueDeserializaitionSchemaFactory(JsonMapDeserialization), None, JsonRecordFormatter, processObjectDependencies)
new FixedValueDeserializaitionSchemaFactory(JsonMapDeserialization), None, JsonRecordFormatter, processObjectDependencies)

class GenericTypedJsonSourceFactory(processObjectDependencies: ProcessObjectDependencies)
extends FlinkSourceFactory[TypedMap] with Serializable {
Expand Down
Loading

0 comments on commit 787efed

Please sign in to comment.