Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka source with additional variable with metadata. #1512

Merged
merged 10 commits into from
May 18, 2021
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ lazy val process = (project in engine("flink/process")).
"org.apache.flink" %% "flink-statebackend-rocksdb" % flinkV % "provided"
)
}
).dependsOn(flinkUtil, interpreter, kafka % "test", kafkaTestUtil % "test", kafkaFlinkUtil % "test", flinkTestUtil % "test")
).dependsOn(flinkUtil, interpreter, kafka % "test", kafkaTestUtil % "test", kafkaFlinkUtil % "test->test", flinkTestUtil % "test")

lazy val interpreter = (project in engine("interpreter")).
settings(commonSettings).
Expand Down
1 change: 1 addition & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Nussknacker versions
* [#1543](https://github.com/TouK/nussknacker/pull/1543) `ComponentProvider` API enables adding new extensions without changing e.g. `ProcessConfigCreator`
* [#1471](https://github.com/TouK/nussknacker/pull/1471) Initial version of session window aggregate added (API may change in the future).
* [#1631](https://github.com/TouK/nussknacker/pull/1631) Ability to use multiple config files with `nussknacker.config.location` system property
* [#1512](https://github.com/TouK/nussknacker/pull/1512) `KafkaSourceFactory` is replaced with source that provides additional #inputMeta variable with event's metadata.

0.3.1 (not released yet)
------------------------
Expand Down
19 changes: 16 additions & 3 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ To see biggest differences please consult the [changelog](Changelog.md).
## In version 0.4.0 (not released yet)

* [#1542](https://github.com/TouK/nussknacker/pull/1542)
`KafkaConfig` now has new parameter `topicsExistenceValidationConfig`. When `topicsExistenceValidationConfig.enabled = true`
`KafkaConfig` now has new parameter `topicsExistenceValidationConfig`. When `topicsExistenceValidationConfig.enabled = true`
Kafka sources/sinks do not validate if provided topic does not exist and cluster is configured with `auto.create.topics.enable=false`
* [#1416](https://github.com/TouK/nussknacker/pull/1416)
`OAuth2Service` has changed. You can still use your old implementation by importing `OAuth2OldService` with an alias.
Expand Down Expand Up @@ -58,11 +58,24 @@ To see biggest differences please consult the [changelog](Changelog.md).
- `BaseKafkaAvroSourceFactory` is able to read both key and value schema determiner to build proper DeserializationSchema (support for keys is not fully introduced in this change)
* [#1514](https://github.com/TouK/nussknacker/pull/1514) `ExecutionConfigPreparer` has different method parameter - `JobData`, which has more info than previous parameters
* [#1532](https://github.com/TouK/nussknacker/pull/1532) `TypedObjectTypingResult#fields` uses now `scala.collection.immutable.ListMap` to keep fields order
* [#1546](https://github.com/TouK/nussknacker/pull/1546) `StandaloneCustomTransformer` now takes a list of `Context` objects, to process them in one go
* [#1546](https://github.com/TouK/nussknacker/pull/1546) `StandaloneCustomTransformer` now takes a list of `Context` objects, to process them in one go
* [#1557](https://github.com/TouK/nussknacker/pull/1556) Some classes from standalone engine were moved to standalone api to remove engine to (model) utils dependency:
`StandaloneContext`, `StandaloneContextLifecycle`, `MetricsProvider`
* [#1558](https://github.com/TouK/nussknacker/pull/1558) `FlinkProcessRegistrar` takes configuration directly from `FlinkProcessCompiler` (this can affect some tests setup)
* [#1558](https://github.com/TouK/nussknacker/pull/1558) `FlinkProcessRegistrar` takes configuration directly from `FlinkProcessCompiler` (this can affect some tests setup)
* [#1631](https://github.com/TouK/nussknacker/pull/1631) Introduction of `nussknacker.config.locations` property, drop use of standard `config.file` property. Model configuration no longer has direct access to root UI config.
* [#1512](https://github.com/TouK/nussknacker/pull/1512)
- Replaced `KafkaSourceFactory` with source based on `GenericNodeTransformation`, which gives access to setup of `ValidationContext` and `Context` initialization.
To migrate `KafkaSourceFactory`:
- provide deserializer factory (source factory requires deserialization to `ConsumerRecord`):
- use `ConsumerRecordDeserializationSchemaFactory` with current `DeserializationSchema` as a value deserializer, add key deserializer (e.g. org.apache.kafka.common.serialization.StringDeserializer)
- or use `FixedValueDeserializaitionSchemaFactory` with simple key-as-string deserializer
- current `RecordFormater` should be sufficient for value-only serialization, or use `ConsumerRecordToJsonFormatter` for whole key-value-and-metadata serialization
- provide timestampAssigner that is able to extract time from `ConsumerRecord[K, V]`
- Removed `BaseKafkaSourceFactory` with multiple topics support:
use `KafkaSourceFactory` instead, see "source with two input topics" test case
- Removed `SingleTopicKafkaSourceFactory`:
use `KafkaSourceFactory` with custom `prepareInitialParameters`, `contextTransformation` and `extractTopics` to alter parameter list and provide constant topic value.
- `TypingResultAwareTypeInformationCustomisation` is moved to package pl.touk.nussknacker.engine.flink.api.typeinformation

## In version 0.3.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.ProcessListener;
import pl.touk.nussknacker.engine.api.Service;
Expand All @@ -22,12 +23,12 @@
import pl.touk.nussknacker.engine.api.process.SourceFactory;
import pl.touk.nussknacker.engine.api.process.WithCategories;
import pl.touk.nussknacker.engine.api.signal.ProcessSignalSender;
import pl.touk.nussknacker.engine.api.test.TestParsingUtils;
import pl.touk.nussknacker.engine.demo.LoggingExceptionHandlerFactory;
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.StandardTimestampWatermarkHandler;
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler;
import pl.touk.nussknacker.engine.javaapi.process.ExpressionConfig;
import pl.touk.nussknacker.engine.javaapi.process.ProcessConfigCreator;
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory;
import pl.touk.nussknacker.engine.kafka.generic.sources;
import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchemaFactory;
import pl.touk.nussknacker.engine.kafka.serialization.schemas;
Expand Down Expand Up @@ -58,16 +59,16 @@ public Map<String, WithCategories<Service>> services(ProcessObjectDependencies p

@Override
public Map<String, WithCategories<SourceFactory<?>>> sourceFactories(ProcessObjectDependencies processObjectDependencies) {
KafkaSourceFactory<Transaction> sourceFactory = getTransactionKafkaSourceFactory(processObjectDependencies);
KafkaSourceFactory<String, Transaction> sourceFactory = getTransactionKafkaSourceFactory(processObjectDependencies);
Map<String, WithCategories<SourceFactory<?>>> m = new HashMap<>();
m.put("kafka-transaction", all(sourceFactory));
return m;
}

private KafkaSourceFactory<Transaction> getTransactionKafkaSourceFactory(ProcessObjectDependencies processObjectDependencies) {
TimestampWatermarkHandler<Transaction> extractor = new StandardTimestampWatermarkHandler<>(WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((SerializableTimestampAssigner<Transaction>) (element, recordTimestamp) -> element.eventDate));
private KafkaSourceFactory<String, Transaction> getTransactionKafkaSourceFactory(ProcessObjectDependencies processObjectDependencies) {
TimestampWatermarkHandler<ConsumerRecord<String, Transaction>> extractor = new StandardTimestampWatermarkHandler<>(WatermarkStrategy
.<ConsumerRecord<String, Transaction>>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((SerializableTimestampAssigner<ConsumerRecord<String, Transaction>>) (element, recordTimestamp) -> element.value().eventDate));

DeserializationSchema<Transaction> schema = new DeserializationSchema<Transaction>() {
@Override
Expand All @@ -86,10 +87,11 @@ public TypeInformation<Transaction> getProducedType() {
}
};
return new KafkaSourceFactory<>(
schema,
new FixedValueDeserializaitionSchemaFactory<>(schema),
Option.apply(extractor),
sources.JsonRecordFormatter$.MODULE$,
processObjectDependencies,
ClassTag$.MODULE$.apply(String.class),
ClassTag$.MODULE$.apply(Transaction.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package pl.touk.nussknacker.engine.demo

import java.time.Duration

import com.typesafe.config.Config
import io.circe.Json
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.api.CirceUtil.decodeJsonUnsafe
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.exception.{EspExceptionHandler, ExceptionHandlerFactory}
import pl.touk.nussknacker.engine.api.process.{ProcessObjectDependencies, _}
import pl.touk.nussknacker.engine.api.signal.ProcessSignalSender
import pl.touk.nussknacker.engine.api.test.TestParsingUtils
import pl.touk.nussknacker.engine.demo.custom.{EventsCounter, TransactionAmountAggregator}
import pl.touk.nussknacker.engine.demo.service.{AlertService, ClientService}
import pl.touk.nussknacker.engine.flink.api.process.FlinkSourceFactory
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestampWatermarkHandler, TimestampWatermarkHandler}
import pl.touk.nussknacker.engine.flink.util.exception.BrieflyLoggingExceptionHandler
import pl.touk.nussknacker.engine.flink.util.source.EspDeserializationSchema
import pl.touk.nussknacker.engine.flink.util.transformer.{TransformStateTransformer, UnionTransformer}
import pl.touk.nussknacker.engine.kafka.consumerrecord.FixedValueDeserializaitionSchemaFactory
import pl.touk.nussknacker.engine.kafka.generic.sources.JsonRecordFormatter
import pl.touk.nussknacker.engine.kafka.serialization.schemas.SimpleSerializationSchema
import pl.touk.nussknacker.engine.kafka.sink.KafkaSinkFactory
import pl.touk.nussknacker.engine.kafka.source.KafkaSourceFactory
import pl.touk.nussknacker.engine.util.LoggingListener

import scala.reflect.ClassTag
import scala.reflect.{ClassTag, classTag}

class DemoProcessConfigCreator extends ProcessConfigCreator {

Expand Down Expand Up @@ -58,7 +61,7 @@ class DemoProcessConfigCreator extends ProcessConfigCreator {
}

private def createTransactionSource(processObjectDependencies: ProcessObjectDependencies) = {
val transactionTimestampExtractor = StandardTimestampWatermarkHandler.boundedOutOfOrderness[Transaction](_.eventDate, Duration.ofMinutes(10))
val transactionTimestampExtractor = StandardTimestampWatermarkHandler.boundedOutOfOrderness[ConsumerRecord[String, Transaction]](_.value().eventDate, Duration.ofMinutes(10))
kafkaSource[Transaction](decodeJsonUnsafe[Transaction](_), Some(transactionTimestampExtractor), processObjectDependencies)
}

Expand All @@ -67,10 +70,11 @@ class DemoProcessConfigCreator extends ProcessConfigCreator {
}

private def kafkaSource[T: TypeInformation](decode: Array[Byte] => T,
timestampAssigner: Option[TimestampWatermarkHandler[T]],
processObjectDependencies: ProcessObjectDependencies): SourceFactory[T] = {
timestampAssigner: Option[TimestampWatermarkHandler[ConsumerRecord[String, T]]],
processObjectDependencies: ProcessObjectDependencies): FlinkSourceFactory[ConsumerRecord[String, T]] = {
val schema = new EspDeserializationSchema[T](bytes => decode(bytes))
new KafkaSourceFactory[T](schema, timestampAssigner, JsonRecordFormatter, processObjectDependencies)(ClassTag(implicitly[TypeInformation[T]].getTypeClass))
val schemaFactory = new FixedValueDeserializaitionSchemaFactory(schema)
new KafkaSourceFactory[String, T](schemaFactory, timestampAssigner, JsonRecordFormatter, processObjectDependencies)(classTag[String], ClassTag(implicitly[TypeInformation[T]].getTypeClass))
}

override def sinkFactories(processObjectDependencies: ProcessObjectDependencies): Map[String, WithCategories[SinkFactory]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ trait TypeInformationDetection extends Serializable {

def forValueWithContext[T](validationContext: ValidationContext, value: TypingResult): TypeInformation[ValueWithContext[T]]

def forType(typingResult: TypingResult): TypeInformation[Any]

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.touk.nussknacker.engine.process.typeinformation
package pl.touk.nussknacker.engine.flink.api.typeinformation

import org.apache.flink.api.common.typeinfo.TypeInformation
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
Expand All @@ -9,6 +9,6 @@ import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
*/
trait TypingResultAwareTypeInformationCustomisation {

def customise(originalDetection: TypingResultAwareTypeInformationDetection): PartialFunction[TypingResult, TypeInformation[_]]
def customise(originalDetection: TypeInformationDetection): PartialFunction[TypingResult, TypeInformation[_]]

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait KafkaAvroDeserializationSchemaFactory extends Serializable {
* @return KafkaDeserializationSchema
*/
def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
keySchemaDataOpt: Option[RuntimeSchemaData],
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any]

Expand All @@ -47,7 +47,7 @@ abstract class KafkaAvroValueDeserializationSchemaFactory
protected def createValueTypeInfo[T: ClassTag](schemaDataOpt: Option[RuntimeSchemaData], kafkaConfig: KafkaConfig): TypeInformation[T]

override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
keySchemaDataOpt: Option[RuntimeSchemaData],
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any] = {
new KafkaDeserializationSchema[V] {
Expand Down Expand Up @@ -91,7 +91,7 @@ abstract class KafkaAvroKeyValueDeserializationSchemaFactory
protected def createObjectTypeInformation[K: ClassTag, V: ClassTag](keyTypeInformation: TypeInformation[K], valueTypeInformation: TypeInformation[V]): TypeInformation[O]

override def create[K: ClassTag, V: ClassTag](kafkaConfig: KafkaConfig,
keySchemaDataOpt: Option[RuntimeSchemaData] = None,
keySchemaDataOpt: Option[RuntimeSchemaData],
valueSchemaDataOpt: Option[RuntimeSchemaData]
): KafkaDeserializationSchema[Any] = {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pl.touk.nussknacker.engine.kafka.source.InputMetaTypeInformationCustomisation
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package pl.touk.nussknacker.engine.kafka.consumerrecord

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Deserializer
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.serialization.KafkaDeserializationSchemaFactory

import scala.reflect.classTag

/**
* Produces deserialization schema that describes how to turn the Kafka raw [[org.apache.kafka.clients.consumer.ConsumerRecord]]
* (with raw key-value of type Array[Byte]) into deserialized ConsumerRecord[K, V] (with proper key-value types).
* It allows the source to provide event value AND event metadata to the stream.
*
* @tparam K - type of key of deserialized ConsumerRecord
* @tparam V - type of value of deserialized ConsumerRecord
*/
abstract class ConsumerRecordDeserializationSchemaFactory[K, V] extends KafkaDeserializationSchemaFactory[ConsumerRecord[K, V]] with Serializable {

protected def createKeyDeserializer(kafkaConfig: KafkaConfig): Deserializer[K]

protected def createValueDeserializer(kafkaConfig: KafkaConfig): Deserializer[V]

override def create(topics: List[String], kafkaConfig: KafkaConfig): KafkaDeserializationSchema[ConsumerRecord[K, V]] = {

new KafkaDeserializationSchema[ConsumerRecord[K, V]] {

@transient
private lazy val keyDeserializer = createKeyDeserializer(kafkaConfig)
@transient
private lazy val valueDeserializer = createValueDeserializer(kafkaConfig)

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[K, V] = {
val key = keyDeserializer.deserialize(record.topic(), record.key())
val value = valueDeserializer.deserialize(record.topic(), record.value())
new ConsumerRecord[K, V](
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
ConsumerRecord.NULL_CHECKSUM.toLong, // ignore deprecated checksum
record.serializedKeySize(),
record.serializedValueSize(),
key,
value,
record.headers(),
record.leaderEpoch()
)
}

override def isEndOfStream(nextElement: ConsumerRecord[K, V]): Boolean = false

// TODO: Provide better way to calculate TypeInformation. Here in case of serialization (of generic type) Kryo is used.
// It is assumed that while this ConsumerRecord[K, V] object lifespan is short, inside of source, this iplementation
// is sufficient.
override def getProducedType: TypeInformation[ConsumerRecord[K, V]] = {
val clazz = classTag[ConsumerRecord[K, V]].runtimeClass.asInstanceOf[Class[ConsumerRecord[K, V]]]
TypeInformation.of(clazz)
}
}
}

}
Loading