-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Template parts handling approach change: expression level instead of LazyParameter level #7174
Changes from 2 commits
5ebf989
580adec
1539ae5
9d5258c
ac643c3
9bdac29
2e84089
a1dccbd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,15 @@ | ||||||||||||||||||||||||||||||||||||||||||||||
package pl.touk.nussknacker.engine.api | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
case class TemplateEvaluationResult(renderedParts: List[TemplateRenderedPart]) { | ||||||||||||||||||||||||||||||||||||||||||||||
def renderedTemplate: String = renderedParts.map(_.value).mkString("") | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
sealed trait TemplateRenderedPart { | ||||||||||||||||||||||||||||||||||||||||||||||
def value: String | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
object TemplateRenderedPart { | ||||||||||||||||||||||||||||||||||||||||||||||
case class RenderedLiteral(value: String) extends TemplateRenderedPart | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
case class RenderedSubExpression(value: String) extends TemplateRenderedPart | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+11
to
+15
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add documentation and validation for rendered part types The case classes need:
object TemplateRenderedPart {
+ /**
+ * Represents a literal string part of the template that should be rendered as-is.
+ * @param value The literal string value (must not be null)
+ */
- case class RenderedLiteral(value: String) extends TemplateRenderedPart
+ case class RenderedLiteral(value: String) extends TemplateRenderedPart {
+ require(value != null, "value cannot be null")
+ }
+ /**
+ * Represents a dynamically evaluated expression part of the template.
+ * @param value The result of expression evaluation (must not be null)
+ */
- case class RenderedSubExpression(value: String) extends TemplateRenderedPart
+ case class RenderedSubExpression(value: String) extends TemplateRenderedPart {
+ require(value != null, "value cannot be null")
+ }
} 📝 Committable suggestion
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Scala, we idiomatically assume that null can't be passed anywhere. Thanks to that we don't need to add such assertions and the code is less noisy. Please don't add such suggestions in the review anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arkadius, understood. I will avoid suggesting null value assertions in Scala code in future reviews, keeping in mind the idiomatic practices. Thank you for the clarification. ✏️ Learnings added
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,9 @@ To see the biggest differences please consult the [changelog](Changelog.md). | |
* [#6988](https://github.com/TouK/nussknacker/pull/6988) Removed unused API classes: `MultiMap`, `TimestampedEvictableStateFunction`. | ||
`MultiMap` was incorrectly handled by Flink's default Kryo serializer, so if you want to copy it to your code | ||
you should write and register a proper serializer. | ||
* [#7162](https://github.com/TouK/nussknacker/pull/7162) When component declares that requires parameter with either `SpelTemplateParameterEditor` | ||
or `SqlParameterEditor` editor, in the runtime, for the expression evaluation result, will be used the new `TemplateEvaluationResult` | ||
class instead of `String` class. To access the previous `String` use `TemplateEvaluationResult.renderedTemplate` method. | ||
Comment on lines
+55
to
+57
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Breaking change in template evaluation requires attention Based on the codebase analysis, this is a significant change affecting components using
The migration guide correctly states that:
The implementation shows this is a breaking change that requires updates to any custom components using these editors. 🔗 Analysis chainTemplate evaluation result change needs attention The change in template evaluation result handling requires careful migration:
🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Find components that may be affected by this change
ast-grep --pattern 'class $_ {
$$$
@MethodToInvoke
def $_($_, editor = SpelTemplateParameterEditor) {
$$$
}
$$$
}'
Length of output: 130 Script: #!/bin/bash
# Let's try a different approach to find usages of these editors
# First, let's search for SpelTemplateParameterEditor
rg "SpelTemplateParameterEditor" -A 2 -B 2
# Also search for SqlParameterEditor
rg "SqlParameterEditor" -A 2 -B 2
# Let's also check for TemplateEvaluationResult to understand the change
rg "TemplateEvaluationResult" -A 2 -B 2
Length of output: 21936 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. components in dev-model are not exposed publicly so it is not important for an end user, these same with classes inside tests modules There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arkadius Understood, thanks for the clarification. Since components in the ✏️ Learnings added
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not exactly. It impacts the end-user, or more precisely a user who developed their own component using these editors. However the same user is not interested in changes in internal components which are only for demonstration of the feature/unit testing purpose There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @arkadius Thank you for the clarification. You are absolutely right. The breaking change impacts end users who have developed their own components using ✏️ Learnings added
|
||
|
||
### REST API changes | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
package pl.touk.nussknacker.engine.flink | ||
|
||
import com.typesafe.config.ConfigFactory | ||
import org.apache.flink.api.common.functions.FlatMapFunction | ||
import org.apache.flink.api.connector.source.Boundedness | ||
import org.apache.flink.streaming.api.datastream.DataStream | ||
import org.apache.flink.util.Collector | ||
import org.scalatest.funsuite.AnyFunSuite | ||
import org.scalatest.matchers.should.Matchers | ||
import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression} | ||
import pl.touk.nussknacker.engine.api._ | ||
import pl.touk.nussknacker.engine.api.component.{BoundedStreamComponent, ComponentDefinition} | ||
import pl.touk.nussknacker.engine.api.context.ValidationContext | ||
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedLazyParameter, NodeDependencyValue, SingleInputDynamicComponent} | ||
import pl.touk.nussknacker.engine.api.definition.{NodeDependency, OutputVariableNameDependency, Parameter, SpelTemplateParameterEditor} | ||
import pl.touk.nussknacker.engine.api.parameter.ParameterName | ||
import pl.touk.nussknacker.engine.api.typed.typing.Typed | ||
import pl.touk.nussknacker.engine.build.ScenarioBuilder | ||
import pl.touk.nussknacker.engine.flink.api.process.{AbstractOneParamLazyParameterFunction, FlinkCustomNodeContext, FlinkCustomStreamTransformation} | ||
import pl.touk.nussknacker.engine.flink.test.FlinkSpec | ||
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._ | ||
import pl.touk.nussknacker.engine.graph.expression.Expression | ||
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode | ||
import pl.touk.nussknacker.engine.spel.SpelExtension._ | ||
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner | ||
import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage | ||
|
||
class SpelTemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers with ValidatedValuesDetailedMessage { | ||
|
||
private lazy val runner = TestScenarioRunner | ||
.flinkBased(ConfigFactory.empty(), flinkMiniCluster) | ||
.withExecutionMode(ExecutionMode.Batch) | ||
.withExtraComponents( | ||
List(ComponentDefinition("spelTemplatePartsCustomTransformer", SpelTemplatePartsCustomTransformer)) | ||
) | ||
.build() | ||
|
||
test("flink custom transformer using spel template rendered parts") { | ||
val scenario = ScenarioBuilder | ||
.streaming("test") | ||
.source("source", TestScenarioRunner.testDataSource) | ||
.customNode( | ||
"custom", | ||
"output", | ||
"spelTemplatePartsCustomTransformer", | ||
"template" -> Expression.spelTemplate(s"Hello#{#input}") | ||
) | ||
.emptySink("sink", TestScenarioRunner.testResultSink, "value" -> "#output".spel) | ||
|
||
val result = runner.runWithData(scenario, List(1, 2, 3), Boundedness.BOUNDED) | ||
result.validValue.errors shouldBe empty | ||
result.validValue.successes shouldBe List( | ||
"[Hello]-literal[1]-subexpression", | ||
"[Hello]-literal[2]-subexpression", | ||
"[Hello]-literal[3]-subexpression" | ||
) | ||
} | ||
|
||
} | ||
|
||
object SpelTemplatePartsCustomTransformer | ||
extends CustomStreamTransformer | ||
with SingleInputDynamicComponent[FlinkCustomStreamTransformation] | ||
with BoundedStreamComponent { | ||
|
||
private val spelTemplateParameterName = ParameterName("template") | ||
|
||
private val spelTemplateParameter = Parameter | ||
.optional[String](spelTemplateParameterName) | ||
.copy( | ||
isLazyParameter = true, | ||
editor = Some(SpelTemplateParameterEditor) | ||
) | ||
|
||
override type State = Unit | ||
|
||
override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])( | ||
implicit nodeId: NodeId | ||
): SpelTemplatePartsCustomTransformer.ContextTransformationDefinition = { | ||
case TransformationStep(Nil, _) => NextParameters(List(spelTemplateParameter)) | ||
case TransformationStep((`spelTemplateParameterName`, DefinedLazyParameter(_)) :: Nil, _) => | ||
val outName = OutputVariableNameDependency.extract(dependencies) | ||
FinalResults(context.withVariableUnsafe(outName, Typed[String]), List.empty) | ||
} | ||
|
||
override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency) | ||
|
||
override def implementation( | ||
params: Params, | ||
dependencies: List[NodeDependencyValue], | ||
finalState: Option[Unit] | ||
): FlinkCustomStreamTransformation = { | ||
val templateLazyParam: LazyParameter[TemplateEvaluationResult] = | ||
params.extractUnsafe[LazyParameter[TemplateEvaluationResult]](spelTemplateParameterName) | ||
FlinkCustomStreamTransformation { | ||
(dataStream: DataStream[Context], flinkCustomNodeContext: FlinkCustomNodeContext) => | ||
dataStream.flatMap( | ||
new AbstractOneParamLazyParameterFunction[TemplateEvaluationResult]( | ||
templateLazyParam, | ||
flinkCustomNodeContext.lazyParameterHelper | ||
) with FlatMapFunction[Context, ValueWithContext[String]] { | ||
override def flatMap(value: Context, out: Collector[ValueWithContext[String]]): Unit = { | ||
collectHandlingErrors(value, out) { | ||
val templateResult = evaluateParameter(value) | ||
val result = templateResult.renderedParts.map { | ||
case RenderedLiteral(value) => s"[$value]-literal" | ||
case RenderedSubExpression(value) => s"[$value]-subexpression" | ||
}.mkString | ||
ValueWithContext(result, value) | ||
} | ||
} | ||
}, | ||
flinkCustomNodeContext.valueWithContextInfo.forClass[String] | ||
).asInstanceOf[DataStream[ValueWithContext[AnyRef]]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid unsafe type casting. The Example safer approach: // Option 1: Use pattern matching with type checking
.map {
case value: DataStream[ValueWithContext[AnyRef]] => value
case other => throw new IllegalStateException(s"Unexpected type: ${other.getClass}")
}
// Option 2: Use type parameters to ensure type safety at compile time
def implementation[T <: AnyRef](...): FlinkCustomStreamTransformation = {
...
} |
||
} | ||
} | ||
|
||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add validation and optimize string concatenation
The implementation is clean but could benefit from some improvements:
📝 Committable suggestion