Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mslabek committed Apr 9, 2024
1 parent a158081 commit 22bba05
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,7 @@ object GenericSourceWithCustomVariablesSample
timestampAssigner = None,
returnType = Typed[ProcessingType],
customContextInitializer = Some(customContextInitializer)
)(TypeInformation.of(classOf[ProcessingType]))
with TestDataGenerator
with FlinkSourceTestSupport[ProcessingType]
with CustomContextInitializerSource[ProcessingType] {
)(TypeInformation.of(classOf[ProcessingType])) with TestDataGenerator with FlinkSourceTestSupport[ProcessingType] {

override def generateTestData(size: Int): TestData = TestData(
elementsValue.map(el => TestRecord(Json.fromString(el)))
Expand All @@ -106,9 +103,6 @@ object GenericSourceWithCustomVariablesSample
override def timestampAssignerForTest: Option[TimestampWatermarkHandler[String]] = timestampAssigner

override def typeInformation: TypeInformation[ProcessingType] = TypeInformation.of(classOf[String])

// TODO: get rid of get
override def contextInitializer: ContextInitializer[ProcessingType] = customContextInitializer.get
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class FlinkTestScenarioRunner(
def runWithData[I: ClassTag, R](
scenario: CanonicalProcess,
data: List[I],
boundedness: Boundedness = Boundedness.CONTINUOUS_UNBOUNDED,
flinkExecutionMode: RuntimeExecutionMode = RuntimeExecutionMode.AUTOMATIC
boundedness: Boundedness,
flinkExecutionMode: RuntimeExecutionMode
): RunnerListResult[R] = {
implicit val typeInf: TypeInformation[I] =
TypeInformation.of(implicitly[ClassTag[I]].runtimeClass.asInstanceOf[Class[I]])
Expand Down

0 comments on commit 22bba05

Please sign in to comment.