Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Apr 10, 2024
1 parent b70b84b commit 3958841
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ trait FlinkIntermediateRawSource[Raw]
flinkNodeContext: FlinkCustomNodeContext,
sourceFunction: SourceFunction[Raw]
): DataStream[Context] = {
val source = FlinkStandardSourceUtils.createSource(env, sourceFunction, typeInformation)
FlinkStandardSourceUtils.prepareSource(source, flinkNodeContext, timestampAssigner, contextInitializer)
FlinkIntermediateRawSourceUtils.prepareSourceStream(
env,
flinkNodeContext,
sourceFunction,
typeInformation,
timestampAssigner,
contextInitializer
)
}

}

object FlinkIntermediateRawSource {
object FlinkIntermediateRawSourceUtils {

@silent("deprecated")
def prepareSourceStream[Raw](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,14 @@ trait BasicFlinkSource[Raw]
env: StreamExecutionEnvironment,
flinkNodeContext: FlinkCustomNodeContext
): DataStream[Context] = {
val source = FlinkStandardSourceUtils.createSource(env, flinkSourceFunction, typeInformation)
FlinkStandardSourceUtils.prepareSource(source, flinkNodeContext, timestampAssigner, contextInitializer)
FlinkIntermediateRawSourceUtils.prepareSourceStream(
env = env,
flinkNodeContext = flinkNodeContext,
sourceFunction = flinkSourceFunction,
typeInformation = typeInformation,
timestampWatermarkHandler = timestampAssigner,
contextInitializer = contextInitializer
)
}

}
Expand All @@ -70,7 +76,6 @@ trait BasicFlinkSource[Raw]
*
* @tparam Raw - type of raw event that is generated by flink source function.
*/
// TODO local: better name?
trait StandardFlinkSource[Raw]
extends FlinkSource
with CustomizableContextInitializerSource[Raw]
Expand All @@ -95,14 +100,14 @@ trait StandardFlinkSource[Raw]

}

trait CustomizableContextInitializerSource[T] { self: Source =>
def contextInitializer: ContextInitializer[T] = new BasicContextInitializer[T](Unknown)
trait CustomizableContextInitializerSource[Raw] { self: Source =>
def contextInitializer: ContextInitializer[Raw] = new BasicContextInitializer[Raw](Unknown)
}

trait CustomizableTimestampWatermarkHandlerSource[T] { self: Source =>
def timestampAssigner: Option[TimestampWatermarkHandler[T]]
trait CustomizableTimestampWatermarkHandlerSource[Raw] { self: Source =>
def timestampAssigner: Option[TimestampWatermarkHandler[Raw]]
}

trait ExplicitTypeInformationSource[T] { self: Source =>
def typeInformation: TypeInformation[T]
trait ExplicitTypeInformationSource[Raw] { self: Source =>
def typeInformation: TypeInformation[Raw]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import org.apache.flink.api.connector.source.Boundedness
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.FromElementsFunction
import pl.touk.nussknacker.engine.api.process.{BasicContextInitializer, ContextInitializer}
import pl.touk.nussknacker.engine.api.typed.ReturningType
import pl.touk.nussknacker.engine.api.typed.typing.TypingResult
import pl.touk.nussknacker.engine.api.typed.typing.{TypingResult, Unknown}
import pl.touk.nussknacker.engine.flink.api.process.{
ExplicitTypeInformationSource,
FlinkCustomNodeContext,
FlinkStandardSourceUtils,
StandardFlinkSource
Expand All @@ -25,6 +27,7 @@ case class CollectionSource[T: TypeInformation](
boundedness: Boundedness = Boundedness.CONTINUOUS_UNBOUNDED,
flinkRuntimeMode: Option[RuntimeExecutionMode] = None
) extends StandardFlinkSource[T]
with ExplicitTypeInformationSource[T]
with ReturningType {

@silent("deprecated")
Expand All @@ -41,9 +44,10 @@ case class CollectionSource[T: TypeInformation](
FlinkStandardSourceUtils.createSource(
env = env,
sourceFunction = new FromElementsFunction[T](list.filterNot(_ == null).asJava),
typeInformation = implicitly[TypeInformation[T]]
typeInformation = typeInformation
)
}
}

override def typeInformation: TypeInformation[T] = implicitly[TypeInformation[T]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ class StubbedFlinkProcessCompilerDataFactoryTest extends AnyFunSuite with Matche

override def parametersToTestData(params: Map[ParameterName, AnyRef]): Int =
params(ParameterName("input")).asInstanceOf[Int]

override def typeInformation: TypeInformation[Int] = TypeInformation.of(classOf[Int])
}

object SampleTestSupportSource
Expand All @@ -194,7 +192,6 @@ class StubbedFlinkProcessCompilerDataFactoryTest extends AnyFunSuite with Matche
override def timestampAssignerForTest: Option[TimestampWatermarkHandler[Int]] = None
override def testRecordParser: TestRecordParser[Int] = (testRecord: TestRecord) =>
CirceUtil.decodeJsonUnsafe[Int](testRecord.json)
override def typeInformation: TypeInformation[Int] = TypeInformation.of(classOf[Int])
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ object GenericSourceWithCustomVariablesSample
CirceUtil.decodeJsonUnsafe[String](testRecord.json)

override def timestampAssignerForTest: Option[TimestampWatermarkHandler[String]] = timestampAssigner

override def typeInformation: TypeInformation[ProcessingType] = TypeInformation.of(classOf[String])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,6 @@ object SampleNodes {
CirceUtil.decodeJsonUnsafe[String](testRecord.json)

override def timestampAssignerForTest: Option[TimestampWatermarkHandler[String]] = timestampAssigner

override def typeInformation: TypeInformation[ProcessingType] = TypeInformation.of(classOf[String])
}
}

Expand Down Expand Up @@ -1039,8 +1037,6 @@ object SampleNodes {
with FlinkSourceTestSupport[SimpleRecord] {
override def testRecordParser: TestRecordParser[SimpleRecord] = simpleRecordParser
override def timestampAssignerForTest: Option[TimestampWatermarkHandler[SimpleRecord]] = timestampAssigner

override def typeInformation: TypeInformation[SimpleRecord] = TypeInformation.of(classOf[SimpleRecord])
}
)

Expand All @@ -1053,8 +1049,6 @@ object SampleNodes {
}

override def timestampAssignerForTest: Option[TimestampWatermarkHandler[SimpleJsonRecord]] = timestampAssigner

override def typeInformation: TypeInformation[SimpleJsonRecord] = TypeInformation.of(classOf[SimpleJsonRecord])
}
)

Expand All @@ -1070,8 +1064,6 @@ object SampleNodes {
with FlinkSourceTestSupport[TypedMap]
with ReturningType {

override def typeInformation: TypeInformation[TypedMap] = TypeInformation.of(classOf[TypedMap])

override def testRecordParser: TestRecordParser[TypedMap] = (testRecord: TestRecord) => {
TypedMap(CirceUtil.decodeJsonUnsafe[Map[String, String]](testRecord.json, "invalid request"))
}
Expand Down

0 comments on commit 3958841

Please sign in to comment.