-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring: FlinkSource API and test source API. #1510
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good change, some comments added.
.../nussknacker/engine/avro/schemaregistry/confluent/formatter/ConfluentAvroMessageReader.scala
Outdated
Show resolved
Hide resolved
engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala
Outdated
Show resolved
Hide resolved
...i/src/main/scala/pl/touk/nussknacker/engine/flink/util/context/FlinkContextInitializer.scala
Outdated
Show resolved
Hide resolved
*/ | ||
abstract class FlinkContextInitializer[T] extends Serializable { | ||
|
||
def validationContext(context: ValidationContext, name: String, result: typing.TypingResult)(implicit nodeId: NodeId): ValidationContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why we need this
name
here? Is it a name in case when we produce only one variable? - Why we need
result
here? What is it? - Shouldn't both be passed as a parameters of constructor in very specific case whene source produces only one variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 & 2. Changed name -> outputVariableName and result -> outputVariableType and fixed documentation. Those parameters represent default one variable produced by the source.
3. For multiple variables see GenericSourceWithCustomVariablesSample
. There is one val customContextInitilalizer
which is used in two places:
- creating the source
- contextTransformation - This is the place where I know all variables and their types (especially in avro sources), and pass them as parameters to the method of already created object.
I see two alternatives:
- pass a List of variable name-and-type pairs
- initialize
var customContextInitilalizer
with new FlinkContextInitializer inside contextTransformation
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see now. WDYT about passing to this validationContext
method everything what we have in TransformationStep
? I know that there are type dependent types but we can have parameters: List[(String, BaseDefinedParameter)], state: Option[Any]
and someone could do some pattern matching on it. WDYT? Thanks to that we will have the whole power of GenericNodeTransformation
in this place.
...k/api/src/main/scala/pl/touk/nussknacker/engine/flink/util/context/InitContextFunction.scala
Outdated
Show resolved
Hide resolved
engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala
Outdated
Show resolved
Hide resolved
engine/flink/api/src/main/scala/pl/touk/nussknacker/engine/flink/api/process/FlinkSource.scala
Outdated
Show resolved
Hide resolved
...i/src/main/scala/pl/touk/nussknacker/engine/flink/util/context/FlinkContextInitializer.scala
Outdated
Show resolved
Hide resolved
*/ | ||
abstract class FlinkContextInitializer[T] extends Serializable { | ||
|
||
def validationContext(context: ValidationContext, name: String, result: typing.TypingResult)(implicit nodeId: NodeId): ValidationContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see now. WDYT about passing to this validationContext
method everything what we have in TransformationStep
? I know that there are type dependent types but we can have parameters: List[(String, BaseDefinedParameter)], state: Option[Any]
and someone could do some pattern matching on it. WDYT? Thanks to that we will have the whole power of GenericNodeTransformation
in this place.
...k/api/src/main/scala/pl/touk/nussknacker/engine/flink/util/context/InitContextFunction.scala
Outdated
Show resolved
Hide resolved
…kContextInitializer.
No description provided.