Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NU-1800] Add TemplateEvaluationResult to evaluate SpEL expression parts in LazyParameter #7162

Merged
merged 15 commits into from
Nov 20, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package pl.touk.nussknacker.engine.api

case class TemplateEvaluationResult(renderedParts: List[TemplateRenderedPart]) {
def renderedTemplate: String = renderedParts.map(_.value).mkString("")
}

sealed trait TemplateRenderedPart {
def value: String
}

object TemplateRenderedPart {
case class RenderedLiteral(value: String) extends TemplateRenderedPart

case class RenderedSubExpression(value: String) extends TemplateRenderedPart
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ object DatabaseQueryEnricher {

final val queryParamName: ParameterName = ParameterName("Query")

final val queryParamDeclaration =
ParameterDeclaration
.mandatory[String](queryParamName)
.withCreator(modify = _.copy(editor = Some(SqlParameterEditor)))
final val queryParam = Parameter[String](queryParamName).copy(editor = Some(SqlParameterEditor))

final val resultStrategyParamName: ParameterName = ParameterName("Result strategy")

Expand Down Expand Up @@ -132,7 +129,7 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid
): ContextTransformationDefinition = { case TransformationStep(Nil, _) =>
NextParameters(parameters =
resultStrategyParamDeclaration.createParameter() ::
queryParamDeclaration.createParameter() ::
queryParam ::
cacheTTLParamDeclaration.createParameter() :: Nil
)
}
Expand All @@ -142,14 +139,15 @@ class DatabaseQueryEnricher(val dbPoolConfig: DBPoolConfig, val dbMetaDataProvid
): ContextTransformationDefinition = {
case TransformationStep(
(`resultStrategyParamName`, DefinedEagerParameter(strategyName: String, _)) ::
(`queryParamName`, DefinedEagerParameter(query: String, _)) ::
(`queryParamName`, DefinedEagerParameter(query: TemplateEvaluationResult, _)) ::
(`cacheTTLParamName`, _) :: Nil,
None
) =>
if (query.isEmpty) {
val renderedQuery = query.renderedTemplate
if (renderedQuery.isEmpty) {
FinalResults(context, errors = CustomNodeError("Query is missing", Some(queryParamName)) :: Nil, state = None)
} else {
parseQuery(context, dependencies, strategyName, query)
parseQuery(context, dependencies, strategyName, renderedQuery)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package pl.touk.nussknacker.sql.service

import pl.touk.nussknacker.engine.api.TemplateRenderedPart.RenderedLiteral
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedEagerParameter, OutputVariableNameValue}
import pl.touk.nussknacker.engine.api.context.{OutputVar, ValidationContext}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, Unknown}
import pl.touk.nussknacker.engine.api.NodeId
import pl.touk.nussknacker.engine.api.{NodeId, TemplateEvaluationResult}
import pl.touk.nussknacker.sql.db.query.{ResultSetStrategy, SingleResultStrategy}
import pl.touk.nussknacker.sql.db.schema.MetaDataProviderFactory
import pl.touk.nussknacker.sql.utils.BaseHsqlQueryEnricherTest
Expand Down Expand Up @@ -32,8 +33,10 @@ class DatabaseQueryEnricherValidationTest extends BaseHsqlQueryEnricherTest {
service.TransformationStep(
List(
DatabaseQueryEnricher.resultStrategyParamName -> eagerValueParameter(SingleResultStrategy.name),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter("select from"),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter(
TemplateEvaluationResult(List(RenderedLiteral("select from")))
),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
),
None
)
Expand Down Expand Up @@ -62,8 +65,10 @@ class DatabaseQueryEnricherValidationTest extends BaseHsqlQueryEnricherTest {
service.TransformationStep(
List(
DatabaseQueryEnricher.resultStrategyParamName -> eagerValueParameter(ResultSetStrategy.name),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter("select * from persons"),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
DatabaseQueryEnricher.queryParamName -> eagerValueParameter(
TemplateEvaluationResult(List(RenderedLiteral("select * from persons")))
),
DatabaseQueryEnricher.cacheTTLParamName -> eagerValueParameter(Duration.ofMinutes(1)),
),
None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import pl.touk.nussknacker.engine.definition.component.methodbased.MethodBasedCo
import pl.touk.nussknacker.engine.definition.component.{ComponentStaticDefinition, FragmentSpecificData}
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api.TemplateEvaluationResult
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypingResult}
import pl.touk.nussknacker.restmodel.definition._
import pl.touk.nussknacker.ui.definition.DefinitionsService.{
ComponentUiConfigMode,
Expand Down Expand Up @@ -162,7 +164,7 @@ object DefinitionsService {
def createUIParameter(parameter: Parameter): UIParameter = {
UIParameter(
name = parameter.name.value,
typ = parameter.typ,
typ = toUIType(parameter.typ),
editor = parameter.finalEditor,
defaultValue = parameter.finalDefaultValue,
additionalVariables = parameter.additionalVariables.mapValuesNow(_.typingResult),
Expand All @@ -174,6 +176,10 @@ object DefinitionsService {
)
}

private def toUIType(typingResult: TypingResult): TypingResult = {
if (typingResult == Typed[TemplateEvaluationResult]) Typed[String] else typingResult
}

def createUIScenarioPropertyConfig(config: ScenarioPropertyConfig): UiScenarioPropertyConfig = {
val editor = UiScenarioPropertyEditorDeterminer.determine(config)
UiScenarioPropertyConfig(config.defaultValue, editor, config.label, config.hintText)
Expand Down
2 changes: 2 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* [#7145](https://github.com/TouK/nussknacker/pull/7145) Lift TypingResult information for dictionaries
* [#7116](https://github.com/TouK/nussknacker/pull/7116) Improve missing Flink Kafka Source / Sink TypeInformation
* [#7123](https://github.com/TouK/nussknacker/pull/7123) Fix deployments for scenarios with dict editors after model reload
* [#7162](https://github.com/TouK/nussknacker/pull/7162) Component API enhancement: ability to access information about
expression parts used in SpEL template

## 1.18

Expand Down
3 changes: 3 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* [#6988](https://github.com/TouK/nussknacker/pull/6988) Removed unused API classes: `MultiMap`, `TimestampedEvictableStateFunction`.
`MultiMap` was incorrectly handled by Flink's default Kryo serializer, so if you want to copy it to your code
you should write and register a proper serializer.
* [#7162](https://github.com/TouK/nussknacker/pull/7162) When component declares that requires parameter with either `SpelTemplateParameterEditor`
or `SqlParameterEditor` editor, in the runtime, for the expression evaluation result, will be used the new `TemplateEvaluationResult`
class instead of `String` class. To access the previous `String` use `TemplateEvaluationResult.renderedTemplate` method.

### REST API changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package pl.touk.nussknacker.engine.flink

import com.typesafe.config.ConfigFactory
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.connector.source.Boundedness
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.util.Collector
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression}
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{BoundedStreamComponent, ComponentDefinition}
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.context.transformation.{DefinedLazyParameter, NodeDependencyValue, SingleInputDynamicComponent}
import pl.touk.nussknacker.engine.api.definition.{NodeDependency, OutputVariableNameDependency, Parameter, SpelTemplateParameterEditor}
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.api.process.{AbstractOneParamLazyParameterFunction, FlinkCustomNodeContext, FlinkCustomStreamTransformation}
import pl.touk.nussknacker.engine.flink.test.FlinkSpec
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.process.FlinkJobConfig.ExecutionMode
import pl.touk.nussknacker.engine.spel.SpelExtension._
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner
import pl.touk.nussknacker.test.ValidatedValuesDetailedMessage

class SpelTemplateLazyParameterTest extends AnyFunSuite with FlinkSpec with Matchers with ValidatedValuesDetailedMessage {

private lazy val runner = TestScenarioRunner
.flinkBased(ConfigFactory.empty(), flinkMiniCluster)
.withExecutionMode(ExecutionMode.Batch)
.withExtraComponents(
List(ComponentDefinition("spelTemplatePartsCustomTransformer", SpelTemplatePartsCustomTransformer))
)
.build()

test("flink custom transformer using spel template rendered parts") {
val scenario = ScenarioBuilder
.streaming("test")
.source("source", TestScenarioRunner.testDataSource)
.customNode(
"custom",
"output",
"spelTemplatePartsCustomTransformer",
"template" -> Expression.spelTemplate(s"Hello#{#input}")
)
.emptySink("sink", TestScenarioRunner.testResultSink, "value" -> "#output".spel)

val result = runner.runWithData(scenario, List(1, 2, 3), Boundedness.BOUNDED)
result.validValue.errors shouldBe empty
result.validValue.successes shouldBe List(
"[Hello]-literal[1]-subexpression",
"[Hello]-literal[2]-subexpression",
"[Hello]-literal[3]-subexpression"
)
}

}

object SpelTemplatePartsCustomTransformer
extends CustomStreamTransformer
with SingleInputDynamicComponent[FlinkCustomStreamTransformation]
with BoundedStreamComponent {

private val spelTemplateParameterName = ParameterName("template")

private val spelTemplateParameter = Parameter
.optional[String](spelTemplateParameterName)
.copy(
isLazyParameter = true,
editor = Some(SpelTemplateParameterEditor)
)

override type State = Unit

override def contextTransformation(context: ValidationContext, dependencies: List[NodeDependencyValue])(
implicit nodeId: NodeId
): SpelTemplatePartsCustomTransformer.ContextTransformationDefinition = {
case TransformationStep(Nil, _) => NextParameters(List(spelTemplateParameter))
case TransformationStep((`spelTemplateParameterName`, DefinedLazyParameter(_)) :: Nil, _) =>
val outName = OutputVariableNameDependency.extract(dependencies)
FinalResults(context.withVariableUnsafe(outName, Typed[String]), List.empty)
}

override def nodeDependencies: List[NodeDependency] = List(OutputVariableNameDependency)

override def implementation(
params: Params,
dependencies: List[NodeDependencyValue],
finalState: Option[Unit]
): FlinkCustomStreamTransformation = {
val templateLazyParam: LazyParameter[TemplateEvaluationResult] =
params.extractUnsafe[LazyParameter[TemplateEvaluationResult]](spelTemplateParameterName)
FlinkCustomStreamTransformation {
(dataStream: DataStream[Context], flinkCustomNodeContext: FlinkCustomNodeContext) =>
dataStream.flatMap(
new AbstractOneParamLazyParameterFunction[TemplateEvaluationResult](
templateLazyParam,
flinkCustomNodeContext.lazyParameterHelper
) with FlatMapFunction[Context, ValueWithContext[String]] {
override def flatMap(value: Context, out: Collector[ValueWithContext[String]]): Unit = {
collectHandlingErrors(value, out) {
val templateResult = evaluateParameter(value)
val result = templateResult.renderedParts.map {
case RenderedLiteral(value) => s"[$value]-literal"
case RenderedSubExpression(value) => s"[$value]-subexpression"
}.mkString
ValueWithContext(result, value)
}
}
},
flinkCustomNodeContext.valueWithContextInfo.forClass[String]
).asInstanceOf[DataStream[ValueWithContext[AnyRef]]]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ object LoggingService extends EagerService {
def prepare(
@ParamName("logger") @Nullable loggerName: String,
@ParamName("level") @DefaultValue("T(org.slf4j.event.Level).DEBUG") level: Level,
@ParamName("message") @SimpleEditor(`type` = SimpleEditorType.SPEL_TEMPLATE_EDITOR) message: LazyParameter[String]
@ParamName("message") @SimpleEditor(`type` = SimpleEditorType.SPEL_TEMPLATE_EDITOR) message: LazyParameter[
TemplateEvaluationResult
]
)(implicit metaData: MetaData, nodeId: NodeId): ServiceInvoker =
new ServiceInvoker {

Expand All @@ -31,7 +33,7 @@ object LoggingService extends EagerService {
collector: ServiceInvocationCollector,
componentUseCase: ComponentUseCase
): Future[Any] = {
val msg = message.evaluate(context)
val msg = message.evaluate(context).renderedTemplate
level match {
case Level.TRACE => logger.trace(msg)
case Level.DEBUG => logger.debug(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import pl.touk.nussknacker.engine.api.component.UnboundedStreamComponent
import pl.touk.nussknacker.engine.api.editor.{SimpleEditor, SimpleEditorType}
import pl.touk.nussknacker.engine.api.process.SourceFactory
import pl.touk.nussknacker.engine.api.typed.typing.Unknown
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName}
import pl.touk.nussknacker.engine.api.{MethodToInvoke, ParamName, TemplateEvaluationResult}
import pl.touk.nussknacker.engine.flink.util.source.CollectionSource

//It's only for test FE sql editor
object SqlSource extends SourceFactory with UnboundedStreamComponent {

@MethodToInvoke
def source(@ParamName("sql") @SimpleEditor(`type` = SimpleEditorType.SQL_EDITOR) sql: String) =
def source(@ParamName("sql") @SimpleEditor(`type` = SimpleEditorType.SQL_EDITOR) sql: TemplateEvaluationResult) =
new CollectionSource[Any](List.empty, None, Unknown)

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ package pl.touk.nussknacker.engine.schemedkafka

import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.KafkaSchemaRegistryBasedValueSerializationSchemaFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.UniversalSchemaBasedSerdeProvider.createSchemaIdFromMessageExtractor
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{UniversalKafkaDeserializerFactory, UniversalSchemaValidator, UniversalSerializerFactory, UniversalToJsonFormatterFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.universal.{
UniversalKafkaDeserializerFactory,
UniversalSchemaValidator,
UniversalSerializerFactory,
UniversalToJsonFormatterFactory
}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaBasedSerdeProvider, SchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.flink.FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.circe.{Decoder, Encoder}
import pl.touk.nussknacker.engine.api.definition.ParameterEditor
import pl.touk.nussknacker.engine.api.typed.supertype.ReturningSingleClassPromotionStrategy
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.api.{Hidden, HideToString}
import pl.touk.nussknacker.engine.api.{Hidden, HideToString, TemplateEvaluationResult}

import java.lang.reflect.{AccessibleObject, Member, Method}
import java.text.NumberFormat
Expand Down Expand Up @@ -109,8 +109,9 @@ object ClassExtractionSettings {
// we want only boxed types
ClassPredicate { case cl => cl.isPrimitive },
ExactClassPredicate[ReturningSingleClassPromotionStrategy],
// We use this type only programmable
// We use these types only programmable
ClassNamePredicate("pl.touk.nussknacker.engine.spel.SpelExpressionRepr"),
ExactClassPredicate[TemplateEvaluationResult],
)

lazy val ExcludedCollectionFunctionalClasses: List[ClassPredicate] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class OrderedDependencies(dependencies: List[NodeDependency]) extends Serializab
): List[Any] = {
dependencies.map {
case param: Parameter =>
values.getOrElse(param.name, throw new IllegalArgumentException(s"Missing parameter: ${param.name}"))
values.getOrElse(param.name, throw new IllegalArgumentException(s"Missing parameter: ${param.name.value}"))
case OutputVariableNameDependency =>
outputVariableNameOpt.getOrElse(throw MissingOutputVariableException)
case TypedNodeDependency(clazz) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import org.springframework.expression.spel.{
SpelParserConfiguration,
standard
}
import pl.touk.nussknacker.engine.api.Context
import pl.touk.nussknacker.engine.api.TemplateRenderedPart.{RenderedLiteral, RenderedSubExpression}
import pl.touk.nussknacker.engine.api.context.ValidationContext
import pl.touk.nussknacker.engine.api.dict.DictRegistry
import pl.touk.nussknacker.engine.api.exception.NonTransientException
import pl.touk.nussknacker.engine.api.generics.ExpressionParseError
import pl.touk.nussknacker.engine.api.typed.typing
import pl.touk.nussknacker.engine.api.typed.typing.{SingleTypingResult, TypingResult}
import pl.touk.nussknacker.engine.api.{Context, TemplateEvaluationResult, TemplateRenderedPart}
import pl.touk.nussknacker.engine.definition.clazz.ClassDefinitionSet
import pl.touk.nussknacker.engine.definition.globalvariables.ExpressionConfigDefinition
import pl.touk.nussknacker.engine.dict.{KeysDictTyper, LabelsDictTyper}
Expand Down Expand Up @@ -107,7 +108,28 @@ class SpelExpression(
return SpelExpressionRepr(parsed.parsed, ctx, globals, original).asInstanceOf[T]
}
val evaluationContext = evaluationContextPreparer.prepareEvaluationContext(ctx, globals)
parsed.getValue[T](evaluationContext, expectedClass)
flavour match {
case SpelExpressionParser.Standard =>
parsed.getValue[T](evaluationContext, expectedClass)
case SpelExpressionParser.Template =>
val parts = renderTemplateExpressionParts(evaluationContext)
TemplateEvaluationResult(parts).asInstanceOf[T]
}
}

private def renderTemplateExpressionParts(evaluationContext: EvaluationContext): List[TemplateRenderedPart] = {
def renderExpression(expression: Expression): List[TemplateRenderedPart] = expression match {
case literal: LiteralExpression => List(RenderedLiteral(literal.getExpressionString))
case spelExpr: org.springframework.expression.spel.standard.SpelExpression =>
// TODO: Should we use the same trick with re-parsing after ClassCastException as we use in ParsedSpelExpression?
List(RenderedSubExpression(spelExpr.getValue[String](evaluationContext, classOf[String])))
case composite: CompositeStringExpression => composite.getExpressions.toList.flatMap(renderExpression)
case other =>
throw new IllegalArgumentException(
s"Unsupported expression type: ${other.getClass.getName} for a template expression"
)
}
renderExpression(parsed.parsed)
}

private def logOnException[A](ctx: Context)(block: => A): A = {
Expand Down
Loading
Loading