Skip to content

Commit

Permalink
CR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gskrobisz committed May 11, 2021
1 parent 322b9d2 commit 3e8c213
Show file tree
Hide file tree
Showing 22 changed files with 666 additions and 659 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.ProcessListener;
import pl.touk.nussknacker.engine.api.Service;
Expand All @@ -22,7 +23,6 @@
import pl.touk.nussknacker.engine.api.process.SourceFactory;
import pl.touk.nussknacker.engine.api.process.WithCategories;
import pl.touk.nussknacker.engine.api.signal.ProcessSignalSender;
import pl.touk.nussknacker.engine.api.test.TestParsingUtils;
import pl.touk.nussknacker.engine.demo.LoggingExceptionHandlerFactory;
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.StandardTimestampWatermarkHandler;
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler;
Expand Down Expand Up @@ -58,16 +58,16 @@ public Map<String, WithCategories<Service>> services(ProcessObjectDependencies p

@Override
public Map<String, WithCategories<SourceFactory<?>>> sourceFactories(ProcessObjectDependencies processObjectDependencies) {
KafkaSourceFactory<Transaction> sourceFactory = getTransactionKafkaSourceFactory(processObjectDependencies);
KafkaSourceFactory<String, Transaction> sourceFactory = getTransactionKafkaSourceFactory(processObjectDependencies);
Map<String, WithCategories<SourceFactory<?>>> m = new HashMap<>();
m.put("kafka-transaction", all(sourceFactory));
return m;
}

private KafkaSourceFactory<Transaction> getTransactionKafkaSourceFactory(ProcessObjectDependencies processObjectDependencies) {
TimestampWatermarkHandler<Transaction> extractor = new StandardTimestampWatermarkHandler<>(WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((SerializableTimestampAssigner<Transaction>) (element, recordTimestamp) -> element.eventDate));
private KafkaSourceFactory<String, Transaction> getTransactionKafkaSourceFactory(ProcessObjectDependencies processObjectDependencies) {
TimestampWatermarkHandler<ConsumerRecord<String, Transaction>> extractor = new StandardTimestampWatermarkHandler<>(WatermarkStrategy
.<ConsumerRecord<String, Transaction>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((SerializableTimestampAssigner<ConsumerRecord<String, Transaction>>) (element, recordTimestamp) -> element.value().eventDate));

DeserializationSchema<Transaction> schema = new DeserializationSchema<Transaction>() {
@Override
Expand All @@ -86,10 +86,11 @@ public TypeInformation<Transaction> getProducedType() {
}
};
return new KafkaSourceFactory<>(
schema,
new sources.EspValueDeserializaitionSchemaFactory<>(schema),
Option.apply(extractor),
sources.JsonRecordFormatter$.MODULE$,
processObjectDependencies,
ClassTag$.MODULE$.apply(String.class),
ClassTag$.MODULE$.apply(Transaction.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@ import io.circe.Json
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.api.CirceUtil.decodeJsonUnsafe
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.exception.{EspExceptionHandler, ExceptionHandlerFactory}
import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, _}
import pl.touk.nussknacker.engine.api.signal.ProcessSignalSender
import pl.touk.nussknacker.engine.api.test.TestParsingUtils
import pl.touk.nussknacker.engine.demo.custom.{EventsCounter, TransactionAmountAggregator}
import pl.touk.nussknacker.engine.demo.service.{AlertService, ClientService}
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.exception.BrieflyLoggingExceptionHandler
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.flink.util.transformer.{TransformStateTransformer, UnionTransformer}
import pl.touk.nussknacker.engine.kafka.generic.sources.JsonRecordFormatter
import pl.touk.nussknacker.engine.kafka.generic.sources.{EspValueDeserializaitionSchemaFactory, JsonRecordFormatter}
import pl.touk.nussknacker.engine.kafka.serialization.schemas.SimpleSerializationSchema
import pl.touk.nussknacker.engine.kafka.sink.KafkaSinkFactory
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
import pl.touk.nussknacker.engine.util.LoggingListener

import scala.reflect.ClassTag
import scala.reflect.{ClassTag, classTag}

class DemoProcessConfigCreator extends ProcessConfigCreator {

Expand Down Expand Up @@ -58,7 +59,7 @@ class DemoProcessConfigCreator extends ProcessConfigCreator {
}

private def createTransactionSource(processObjectDependencies: ProcessObjectDependencies) = {
val transactionTimestampExtractor = StandardTimestampWatermarkHandler.boundedOutOfOrderness[Transaction](_.eventDate, Duration.ofMinutes(10))
val transactionTimestampExtractor = StandardTimestampWatermarkHandler.boundedOutOfOrderness[ConsumerRecord[String, Transaction]](_.value().eventDate, Duration.ofMinutes(10))
kafkaSource[Transaction](decodeJsonUnsafe[Transaction](_), Some(transactionTimestampExtractor), processObjectDependencies)
}

Expand All @@ -67,10 +68,11 @@ class DemoProcessConfigCreator extends ProcessConfigCreator {
}

private def kafkaSource[T: TypeInformation](decode: Array[Byte] => T,
timestampAssigner: Option[TimestampWatermarkHandler[T]],
processObjectDependencies: ProcessObjectDependencies): SourceFactory[T] = {
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, T]]],
processObjectDependencies: ProcessObjectDependencies): FlinkSourceFactory[ConsumerRecord[String, T]] = {
val schema = new EspDeserializationSchema[T](bytes => decode(bytes))
new KafkaSourceFactory[T](schema, timestampAssigner, JsonRecordFormatter, processObjectDependencies)(ClassTag(implicitly[TypeInformation[T]].getTypeClass))
val schemaFactory = new EspValueDeserializaitionSchemaFactory(schema)
new KafkaSourceFactory[String, T](schemaFactory, timestampAssigner, JsonRecordFormatter, processObjectDependencies)(classTag[String], ClassTag(implicitly[TypeInformation[T]].getTypeClass))
}

override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait KafkaAvroDeserializationSchemaFactory extends Serializable {
* @return KafkaDeserializationSchema
*/
def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
keySchemaDataOpt: Option[RuntimeSchemaData],
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any]

Expand All @@ -47,7 +47,7 @@ abstract class KafkaAvroValueDeserializationSchemaFactory
protected def createValueTypeInfo[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[T]

override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
keySchemaDataOpt: Option[RuntimeSchemaData],
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any] = {
new KafkaDeserializationSchema[V] {
Expand Down Expand Up @@ -91,7 +91,7 @@ abstract class KafkaAvroKeyValueDeserializationSchemaFactory
protected def createObjectTypeInformation[K: ClassTag, V: ClassTag](keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O]

override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
keySchemaDataOpt: Option[RuntimeSchemaData],
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any] = {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord

import com.github.ghik.silencer.silent
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Deserializer
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchemaFactory

import scala.annotation.nowarn
import scala.reflect.{ClassTag, classTag}
import scala.reflect.classTag

/**
* Produces deserialization schema that describes how to turn the Kafka raw [[org.apache.kafka.clients.consumer.ConsumerRecord]]
Expand All @@ -23,19 +23,24 @@ import scala.reflect.{ClassTag, classTag}
*/
@silent("deprecated")
@nowarn("cat=deprecation")
class ConsumerRecordDeserializationSchemaFactory[K: ClassTag, V: ClassTag](keyDeserializationSchema: DeserializationSchema[K],
valueDeserializationSchema: DeserializationSchema[V])
extends KafkaDeserializationSchemaFactory[ConsumerRecord[K, V]] {
abstract class ConsumerRecordDeserializationSchemaFactory[K, V] extends KafkaDeserializationSchemaFactory[ConsumerRecord[K, V]] with Serializable {

override def create(topics: List[String], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[ConsumerRecord[K, V]] = {
protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[K]

protected def createValueDeserializer(kafkaConfig: KafkaConfig): Deserializer[V]

val clazz = classTag[ConsumerRecord[K, V]].runtimeClass.asInstanceOf[Class[ConsumerRecord[K, V]]]
override def create(topics: List[String], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[ConsumerRecord[K, V]] = {

new KafkaDeserializationSchema[ConsumerRecord[K, V]] {

@transient
private lazy val keyDeserializer = createKeyDeserializer(kafkaConfig)
@transient
private lazy val valueDeserializer = createValueDeserializer(kafkaConfig)

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = {
val key = keyDeserializationSchema.deserialize(record.key())
val value = valueDeserializationSchema.deserialize(record.value())
val key = keyDeserializer.deserialize(record.topic(), record.key())
val value = valueDeserializer.deserialize(record.topic(), record.value())
new ConsumerRecord[K, V](
record.topic(),
record.partition(),
Expand All @@ -53,7 +58,10 @@ class ConsumerRecordDeserializationSchemaFactory[K: ClassTag, V: ClassTag](keyDe

override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false

override def getProducedType: TypeInformation[ConsumerRecord[K, V]] = TypeInformation.of(clazz)
override def getProducedType: TypeInformation[ConsumerRecord[K, V]] = {
val clazz = classTag[ConsumerRecord[K, V]].runtimeClass.asInstanceOf[Class[ConsumerRecord[K, V]]]
TypeInformation.of(clazz)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,88 +1,87 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord

import java.nio.charset.{Charset, StandardCharsets}
import java.nio.charset.StandardCharsets

import com.github.ghik.silencer.silent
import io.circe.Decoder.Result
import io.circe.{Decoder, Encoder, HCursor, Json}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}
import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.TimestampType
import pl.touk.nussknacker.engine.api.CirceUtil
import pl.touk.nussknacker.engine.api.test.{TestDataSplit, TestParsingUtils}
import pl.touk.nussknacker.engine.kafka.{ConsumerRecordUtils, RecordFormatter}
import pl.touk.nussknacker.engine.util.json.BestEffortJsonEncoder
import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord._

import scala.annotation.nowarn

@silent("deprecated")
@nowarn("cat=deprecation")
class ConsumerRecordToJsonFormatter extends RecordFormatter {
class ConsumerRecordToJsonFormatter[K: Encoder:Decoder, V: Encoder:Decoder](deserializationSchema: KafkaDeserializationSchema[ConsumerRecord[K, V]],
serializationSchema: KafkaSerializationSchema[ConsumerRecord[K, V]])
extends RecordFormatter {

private val cs: Charset = StandardCharsets.UTF_8

// TODO: add better key-value encoding and decoding
implicit val consumerRecordEncoder: Encoder[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val encode: Encoder[Any] = BestEffortJsonEncoder(failOnUnkown = false).circeEncoder
new Encoder[ConsumerRecord[Array[Byte], Array[Byte]]] {
override def apply(a: ConsumerRecord[Array[Byte], Array[Byte]]): Json = Json.obj(
"topic" -> encode(a.topic()),
"partition" -> encode(a.partition()),
"offset" -> encode(a.offset()),
"timestamp" -> encode(a.timestamp()),
"timestampType" -> encode(a.timestampType().name),
"serializedKeySize" -> encode(a.serializedKeySize()),
"serializedValueSize" -> encode(a.serializedValueSize()),
"key" -> encode(Option(a.key()).map(bytes => new String(bytes)).orNull),
"value" -> encode(new String(a.value())),
"leaderEpoch" -> encode(a.leaderEpoch().orElse(null)),
"checksum" -> encode(a.checksum()),
"headers" -> encode(ConsumerRecordUtils.toMap(a.headers()))
)
}
}

implicit val consumerRecordDecoder: Decoder[ConsumerRecord[Array[Byte], Array[Byte]]] = {
new Decoder[ConsumerRecord[Array[Byte], Array[Byte]]] {
override def apply(c: HCursor): Result[ConsumerRecord[Array[Byte], Array[Byte]]] = {
for {
topic <- c.downField("topic").as[String].right
partition <- c.downField("partition").as[Int].right
offset <- c.downField("offset").as[Long].right
timestamp <- c.downField("timestamp").as[Option[Long]].right
timestampType <- c.downField("timestampType").as[Option[String]].right
serializedKeySize <- c.downField("serializedKeySize").as[Option[Int]].right
serializedValueSize <- c.downField("serializedValueSize").as[Option[Int]].right
key <- c.downField("key").as[Option[String]].right
value <- c.downField("value").as[Option[String]].right
leaderEpoch <- c.downField("leaderEpoch").as[Option[Int]].right
checksum <- c.downField("checksum").as[Option[java.lang.Long]].right
headers <- c.downField("headers").as[Map[String, Option[String]]].right
} yield new ConsumerRecord[Array[Byte], Array[Byte]](
topic,
partition,
offset,
timestamp.getOrElse(ConsumerRecord.NO_TIMESTAMP),
timestampType.map(TimestampType.forName).getOrElse(TimestampType.NO_TIMESTAMP_TYPE),
checksum.getOrElse(ConsumerRecord.NULL_CHECKSUM.toLong),
serializedKeySize.getOrElse(ConsumerRecord.NULL_SIZE),
serializedValueSize.getOrElse(ConsumerRecord.NULL_SIZE),
key.map(_.getBytes()).orNull,
value.map(_.getBytes()).orNull,
ConsumerRecordUtils.toHeaders(headers.mapValues(v => v.orNull)),
java.util.Optional.ofNullable(leaderEpoch.map(Integer.valueOf).orNull)
)
}
}
}
implicit val consumerRecordDecoder: Decoder[SerializableConsumerRecord[K, V]] = deriveDecoder
implicit val consumerRecordEncoder: Encoder[SerializableConsumerRecord[K, V]] = deriveEncoder

override protected def formatRecord(record: ConsumerRecord[Array[Byte], Array[Byte]]): Array[Byte] = {
implicitly[Encoder[ConsumerRecord[Array[Byte], Array[Byte]]]].apply(record).noSpaces.getBytes(cs)
val deserializedRecord = deserializationSchema.deserialize(record)
val serializableRecord = SerializableConsumerRecord(
Option(deserializedRecord.key()),
deserializedRecord.value(),
Option(deserializedRecord.topic()),
Option(deserializedRecord.partition()),
Option(deserializedRecord.offset()),
Option(deserializedRecord.timestamp()),
Option(ConsumerRecordUtils.toMap(deserializedRecord.headers()).mapValues(s => Option(s)))
)
implicitly[Encoder[SerializableConsumerRecord[K, V]]].apply(serializableRecord).noSpaces.getBytes(StandardCharsets.UTF_8)
}

override protected def parseRecord(topic: String, bytes: Array[Byte]): ConsumerRecord[Array[Byte], Array[Byte]] = {
CirceUtil.decodeJsonUnsafe[ConsumerRecord[Array[Byte], Array[Byte]]](bytes)
val serializableRecord = CirceUtil.decodeJsonUnsafe[SerializableConsumerRecord[K, V]](bytes) // decode json in SerializableConsumerRecord[K, V] domain
val serializableConsumerRecord = SerializableConsumerRecord.from(topic, serializableRecord) // update with defaults if fields are missing in json
// Here serialization schema and ProducerRecord are used to transform key and value to proper Array[Byte].
// Other properties are ignored by serializer and are based on values provided by decoded json (or default empty values).
val producerRecord = serializationSchema.serialize(serializableConsumerRecord, serializableConsumerRecord.timestamp()) // serialize K and V to Array[Byte]
createConsumerRecord(
serializableConsumerRecord.topic,
serializableConsumerRecord.partition,
serializableConsumerRecord.offset,
serializableConsumerRecord.timestamp,
producerRecord.key(),
producerRecord.value(),
producerRecord.headers()
)
}

override protected def testDataSplit: TestDataSplit = TestParsingUtils.newLineSplit

}


case class SerializableConsumerRecord[K, V](key: Option[K], value: V, topic: Option[String], partition: Option[Int], offset: Option[Long], timestamp: Option[Long], headers: Option[Map[String, Option[String]]])

object SerializableConsumerRecord {

def createConsumerRecord[K, V](topic: String, partition: Int, offset: Long, timestamp: Long, key: K, value: V, headers: Headers): ConsumerRecord[K, V] = {
new ConsumerRecord(topic, partition, offset, timestamp,
TimestampType.NO_TIMESTAMP_TYPE, ConsumerRecord.NULL_CHECKSUM.longValue(),
ConsumerRecord.NULL_SIZE, ConsumerRecord.NULL_SIZE,
key, value, headers
)
}

def from[K, V](topic: String, record: SerializableConsumerRecord[K, V]): ConsumerRecord[K, V] = {
createConsumerRecord(
record.topic.getOrElse(topic),
record.partition.getOrElse(0),
record.offset.getOrElse(0L),
record.timestamp.getOrElse(ConsumerRecord.NO_TIMESTAMP),
record.key.getOrElse(null.asInstanceOf[K]),
record.value,
ConsumerRecordUtils.toHeaders(record.headers.map(_.mapValues(_.orNull)).getOrElse(Map.empty))
)
}
}
Loading

0 comments on commit 3e8c213

Please sign in to comment.