Skip to content

Commit

Permalink
experiment - remove intermediate raw source
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Apr 10, 2024
1 parent f1c8e6c commit f92d467
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,6 @@ import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler

/**
* Source with typical source stream transformations:
* 1. adds source using provided SourceFunction (here raw source produces raw data)
* 2. sets UID and overrides source name
* 3. assigns timestamp and watermarks
* 4. initializes Context that is streamed within output DataStream (here raw data are mapped to Context)
* It separates raw event data produced by SourceFunction and data released to the stream as Context variables.
* By default it uses basic "single input value" implementation of initializer, see [[BasicContextInitializer]].
*
* @tparam Raw - type of raw event that is generated by flink source function.
*/
// TODO: this should extend FlinkSource not Source
trait FlinkIntermediateRawSource[Raw] extends CustomizableContextInitializerSource[Raw] { self: Source =>

// We abstracting to stream so theoretically it shouldn't be defined on this level but:
// * for test mechanism purpose we need to know what type will be generated.
// * for production sources (eg BaseFlinkSource, KafkaSource) it is used to determine type information for flinkSourceFunction
def typeInformation: TypeInformation[Raw]

def timestampAssigner: Option[TimestampWatermarkHandler[Raw]]

override val contextInitializer: ContextInitializer[Raw] = FlinkStandardSourceUtils.defaultContextInitializer

@silent("deprecated")
def prepareSourceStream(
env: StreamExecutionEnvironment,
flinkNodeContext: FlinkCustomNodeContext,
sourceFunction: SourceFunction[Raw]
): DataStream[Context] = {
val source = FlinkStandardSourceUtils.createSource(env, sourceFunction, typeInformation)
FlinkStandardSourceUtils.prepareSource(source, flinkNodeContext, timestampAssigner, Some(contextInitializer))
}

}

object FlinkStandardSourceUtils extends ExplicitUidInOperatorsSupport {

def defaultContextInitializer[Raw] = new BasicContextInitializer[Raw](Unknown)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ trait FlinkSourceTestSupport[Raw] extends SourceTestSupport[Raw] { self: Source
* @tparam Raw - type of raw event that is generated by flink source function.
* This is needed to handle e.g. syntax suggestions in UI (in sources with explicite @MethodToInvoke).
*/
trait BasicFlinkSource[Raw] extends FlinkSource with FlinkIntermediateRawSource[Raw] {
trait BasicFlinkSource[Raw]
extends FlinkSource
with CustomizableContextInitializerSource[Raw]
with CustomizableTimestampWatermarkHandlerSource[Raw]
with ExplicitTypeInformationSource[Raw] {

@silent("deprecated")
def flinkSourceFunction: SourceFunction[Raw]
Expand All @@ -55,13 +59,16 @@ trait BasicFlinkSource[Raw] extends FlinkSource with FlinkIntermediateRawSource[
env: StreamExecutionEnvironment,
flinkNodeContext: FlinkCustomNodeContext
): DataStream[Context] = {
prepareSourceStream(env, flinkNodeContext, flinkSourceFunction)
val source = FlinkStandardSourceUtils.createSource(env, flinkSourceFunction, typeInformation)
FlinkStandardSourceUtils.prepareSource(source, flinkNodeContext, timestampAssigner, Some(contextInitializer))
}

override def contextInitializer: ContextInitializer[Raw] = new BasicContextInitializer[Raw](Unknown)

}

/**
* Source providing default logic for transforming a `DataStreamSource` into `DataStream[Context]`. Fulfills
* Source providing default logic for transforming a `DataStreamSource[Raw]` into `DataStream[Context]`. Fulfills
* the same role as `BasicFlinkSource`, but is based on `DataStreamSource` instead of `SourceFunction`.
*
* @tparam Raw - type of raw event that is generated by flink source function.
Expand Down Expand Up @@ -95,3 +102,7 @@ trait CustomizableContextInitializerSource[T] { self: Source =>
trait CustomizableTimestampWatermarkHandlerSource[T] { self: Source =>
def timestampAssigner: Option[TimestampWatermarkHandler[T]]
}

trait ExplicitTypeInformationSource[T] { self: Source =>
def typeInformation: TypeInformation[T]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import pl.touk.nussknacker.engine.definition.fragment.{
FragmentComponentDefinition,
FragmentParametersDefinitionExtractor
}
import pl.touk.nussknacker.engine.flink.api.process.{FlinkIntermediateRawSource, FlinkSourceTestSupport}
import pl.touk.nussknacker.engine.flink.api.process.{CustomizableContextInitializerSource, FlinkSourceTestSupport}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.TimestampWatermarkHandler
import pl.touk.nussknacker.engine.graph.node.FragmentInputDefinition

Expand Down Expand Up @@ -50,7 +50,7 @@ class StubbedFragmentSourceDefinitionPreparer(

private def buildSource(inputParameters: List[Parameter]): Source = {
new Source
with FlinkIntermediateRawSource[Map[String, Any]]
with CustomizableContextInitializerSource[Map[String, Any]]
with FlinkSourceTestSupport[Map[String, Any]]
with TestWithParametersSupport[Map[String, Any]] {
override def timestampAssignerForTest: Option[TimestampWatermarkHandler[Map[String, Any]]] = None
Expand All @@ -60,8 +60,6 @@ class StubbedFragmentSourceDefinitionPreparer(

override def testRecordParser: TestRecordParser[Map[String, Any]] = ???

override def timestampAssigner: Option[TimestampWatermarkHandler[Map[String, Any]]] = None

override def testParametersDefinition: List[Parameter] = inputParameters

override def parametersToTestData(params: Map[ParameterName, AnyRef]): Map[String, Any] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import pl.touk.nussknacker.engine.definition.component.ComponentDefinitionWithIm
import pl.touk.nussknacker.engine.flink.api.exception.FlinkEspExceptionConsumer
import pl.touk.nussknacker.engine.flink.api.process.{
CustomizableContextInitializerSource,
FlinkIntermediateRawSource,
FlinkSource,
FlinkSourceTestSupport
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,25 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.definition.Parameter
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.process.{ContextInitializer, TestWithParametersSupport}
import pl.touk.nussknacker.engine.api.runtimecontext.{ContextIdGenerator, EngineRuntimeContext}
import pl.touk.nussknacker.engine.api.test.{TestRecord, TestRecordParser}
import pl.touk.nussknacker.engine.api.{Context, NodeId}
import pl.touk.nussknacker.engine.flink.api.compat.ExplicitUidInOperatorsSupport
import pl.touk.nussknacker.engine.flink.api.exception.ExceptionHandler
import pl.touk.nussknacker.engine.flink.api.process.{
FlinkCustomNodeContext,
FlinkIntermediateRawSource,
FlinkSource,
FlinkSourceTestSupport
FlinkSourceTestSupport,
FlinkStandardSourceUtils,
StandardFlinkSource
}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.StandardTimestampWatermarkHandler.SimpleSerializableTimestampAssigner
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{
Expand Down Expand Up @@ -52,8 +52,7 @@ class FlinkKafkaSource[T](
testParametersInfo: KafkaTestParametersInfo,
overriddenConsumerGroup: Option[String] = None,
namingStrategy: NamingStrategy
) extends FlinkSource
with FlinkIntermediateRawSource[T]
) extends StandardFlinkSource[T]
with Serializable
with FlinkSourceTestSupport[T]
with RecordFormatterBaseTestDataGenerator
Expand All @@ -62,14 +61,13 @@ class FlinkKafkaSource[T](

protected lazy val topics: List[String] = preparedTopics.map(_.prepared)

override def sourceStream(
override def initialSourceStream(
env: StreamExecutionEnvironment,
flinkNodeContext: FlinkCustomNodeContext
): DataStream[Context] = {
): DataStreamSource[T] = {
val consumerGroupId = prepareConsumerGroupId(flinkNodeContext)
val sourceFunction = flinkSourceFunction(consumerGroupId, flinkNodeContext)

prepareSourceStream(env, flinkNodeContext, sourceFunction)
FlinkStandardSourceUtils.createSource(env, sourceFunction, typeInformation)
}

override val typeInformation: TypeInformation[T] = {
Expand Down

0 comments on commit f92d467

Please sign in to comment.